summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHari Khalsa <hkhalsa@10gen.com>2013-07-08 14:44:32 -0400
committerHari Khalsa <hkhalsa@10gen.com>2013-07-09 14:10:54 -0400
commitb8f0ec598013009c56dee527e76429ffa7b8c394 (patch)
treea7cf31d7fbf6bc525f4b1ded421dad8a97aed9a5
parentfedd312c424fc5f82f7be1dad13f3dd74403c4a4 (diff)
downloadmongo-b8f0ec598013009c56dee527e76429ffa7b8c394.tar.gz
SERVER-10026 fetch limit skip or
-rw-r--r--jstests/stages_fetch.js33
-rw-r--r--jstests/stages_limit_skip.js34
-rw-r--r--jstests/stages_or.js33
-rw-r--r--src/mongo/SConscript14
-rw-r--r--src/mongo/db/exec/SConscript51
-rw-r--r--src/mongo/db/exec/fetch.cpp171
-rw-r--r--src/mongo/db/exec/fetch.h69
-rw-r--r--src/mongo/db/exec/limit.cpp52
-rw-r--r--src/mongo/db/exec/limit.h52
-rw-r--r--src/mongo/db/exec/mock_stage.cpp52
-rw-r--r--src/mongo/db/exec/mock_stage.h73
-rw-r--r--src/mongo/db/exec/or.cpp111
-rw-r--r--src/mongo/db/exec/or.h68
-rw-r--r--src/mongo/db/exec/plan_stage.h29
-rw-r--r--src/mongo/db/exec/simple_plan_runner.h27
-rw-r--r--src/mongo/db/exec/skip.cpp58
-rw-r--r--src/mongo/db/exec/skip.h51
-rw-r--r--src/mongo/db/exec/stagedebug_cmd.cpp58
-rw-r--r--src/mongo/db/matcher/matcher.cpp1
-rw-r--r--src/mongo/db/pdfile.h3
-rw-r--r--src/mongo/dbtests/query_stage_fetch.cpp389
-rw-r--r--src/mongo/dbtests/query_stage_limit_skip.cpp92
22 files changed, 1498 insertions, 23 deletions
diff --git a/jstests/stages_fetch.js b/jstests/stages_fetch.js
new file mode 100644
index 00000000000..3e2c01df91a
--- /dev/null
+++ b/jstests/stages_fetch.js
@@ -0,0 +1,33 @@
+// Test basic fetch functionality.
+t = db.stages_fetch;
+t.drop();
+
+var N = 50;
+for (var i = 0; i < N; ++i) {
+ t.insert({foo: i, bar: N - i, baz: i});
+}
+
+t.ensureIndex({foo: 1});
+
+// 20 <= foo <= 30
+// bar == 25 (not covered, should error.)
+ixscan1 = {ixscan: {args:{name: "stages_fetch", keyPattern:{foo:1},
+ startKey: {"": 20},
+ endKey: {"" : 30}, endKeyInclusive: true,
+ direction: 1},
+ filter: {bar: 25}}};
+res = db.runCommand({stageDebug: ixscan1});
+assert(db.getLastError());
+assert.eq(res.ok, 0);
+
+// Now, add a fetch. We should be able to filter on the non-covered field since we fetched the obj.
+ixscan2 = {ixscan: {args:{name: "stages_fetch", keyPattern:{foo:1},
+ startKey: {"": 20},
+ endKey: {"" : 30}, endKeyInclusive: true,
+ direction: 1}}}
+fetch = {fetch: {args: {node: ixscan2}, filter: {bar: 25}}}
+res = db.runCommand({stageDebug: fetch});
+printjson(res);
+assert(!db.getLastError());
+assert.eq(res.ok, 1);
+assert.eq(res.results.length, 1);
diff --git a/jstests/stages_limit_skip.js b/jstests/stages_limit_skip.js
new file mode 100644
index 00000000000..9441e4cd65b
--- /dev/null
+++ b/jstests/stages_limit_skip.js
@@ -0,0 +1,34 @@
+// Test limit and skip
+t = db.stages_limit_skip;
+t.drop();
+
+var N = 50;
+for (var i = 0; i < N; ++i) {
+ t.insert({foo: i, bar: N - i, baz: i});
+}
+
+t.ensureIndex({foo: 1})
+
+// foo <= 20, decreasing
+// Limit of 5 results.
+ixscan1 = {ixscan: {args:{name: "stages_limit_skip", keyPattern:{foo: 1},
+ startKey: {"": 20},
+ endKey: {}, endKeyInclusive: true,
+ direction: -1}}};
+limit1 = {limit: {args: {node: ixscan1, num: 5}}}
+res = db.runCommand({stageDebug: limit1});
+assert(!db.getLastError());
+assert.eq(res.ok, 1);
+assert.eq(res.results.length, 5);
+assert.eq(res.results[0].foo, 20);
+assert.eq(res.results[4].foo, 16);
+
+// foo <= 20, decreasing
+// Skip 5 results.
+skip1 = {skip: {args: {node: ixscan1, num: 5}}}
+res = db.runCommand({stageDebug: skip1});
+assert(!db.getLastError());
+assert.eq(res.ok, 1);
+assert.eq(res.results.length, 16);
+assert.eq(res.results[0].foo, 15);
+assert.eq(res.results[res.results.length - 1].foo, 0);
diff --git a/jstests/stages_or.js b/jstests/stages_or.js
new file mode 100644
index 00000000000..bb0e02b11d4
--- /dev/null
+++ b/jstests/stages_or.js
@@ -0,0 +1,33 @@
+// Test basic OR functionality
+t = db.stages_or;
+t.drop();
+
+var N = 50;
+for (var i = 0; i < N; ++i) {
+ t.insert({foo: i, bar: N - i, baz: i});
+}
+
+t.ensureIndex({foo: 1})
+t.ensureIndex({bar: 1})
+t.ensureIndex({baz: 1})
+
+// baz >= 40
+ixscan1 = {ixscan: {args:{name: "stages_or", keyPattern:{baz: 1},
+ startKey: {"": 40}, endKey: {},
+ endKeyInclusive: true, direction: 1}}};
+// foo >= 40
+ixscan2 = {ixscan: {args:{name: "stages_or", keyPattern:{foo: 1},
+ startKey: {"": 40}, endKey: {},
+ endKeyInclusive: true, direction: 1}}};
+
+// OR of baz and foo. Baz == foo and we dedup.
+orix1ix2 = {or: {args: {nodes: [ixscan1, ixscan2], dedup:true}}};
+res = db.runCommand({stageDebug: orix1ix2});
+assert.eq(res.ok, 1);
+assert.eq(res.results.length, 10);
+
+// No deduping, 2x the results.
+orix1ix2nodd = {or: {args: {nodes: [ixscan1, ixscan2], dedup:false}}};
+res = db.runCommand({stageDebug: orix1ix2nodd});
+assert.eq(res.ok, 1);
+assert.eq(res.results.length, 20);
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 2894b605eff..180a8f05971 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -17,6 +17,7 @@ Import("darwin windows solaris linux nix")
env.SConscript(['base/SConscript',
'db/auth/SConscript',
+ 'db/exec/SConscript',
'db/fts/SConscript',
'db/ops/SConscript',
'db/SConscript',
@@ -348,7 +349,7 @@ env.StaticLibrary("coredb", [
'expressions',
'expressions_geo',
'expressions_where',
- 'working_set',
+ 'db/exec/working_set',
'$BUILD_DIR/mongo/foundation'])
coreServerFiles = [ "db/client_basic.cpp",
@@ -463,11 +464,6 @@ serverOnlyFiles = [ "db/curop.cpp",
"db/prefetch.cpp",
"db/repl/write_concern.cpp",
"db/btreecursor.cpp",
- "db/exec/and_hash.cpp",
- "db/exec/and_sorted.cpp",
- "db/exec/index_scan.cpp",
- "db/exec/stagedebug_cmd.cpp",
- "db/exec/working_set_common.cpp",
"db/index_legacy.cpp",
"db/index_selection.cpp",
"db/index/2d_access_method.cpp",
@@ -669,10 +665,6 @@ serverOnlyFiles += [ "s/d_logic.cpp",
env.StaticLibrary("defaultversion", "s/default_version.cpp")
-# Query execution
-env.StaticLibrary("working_set", [ "db/exec/working_set.cpp", ], LIBDEPS = [ "bson",])
-env.CppUnitTest("working_set_test", [ "db/exec/working_set_test.cpp" ], LIBDEPS = ["working_set"])
-
# Geo
env.StaticLibrary("geometry", [ "db/geo/hash.cpp", "db/geo/shapes.cpp", ], LIBDEPS = [ "bson" ])
env.StaticLibrary("geoparser", [ "db/geo/geoparser.cpp", ],
@@ -730,6 +722,7 @@ env.StaticLibrary("serveronly", serverOnlyFiles,
's/metadata',
"working_set",
"writebatch",
+ "db/exec/exec",
'$BUILD_DIR/third_party/shim_snappy'])
@@ -827,6 +820,7 @@ test = testEnv.Install(
"gridfs",
"s/upgrade",
"mocklib",
+ "db/exec/mock_stage",
"$BUILD_DIR/mongo/db/auth/authmocks"]))
if len(testEnv.subst('$PROGSUFFIX')):
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
new file mode 100644
index 00000000000..c0606c4ab1d
--- /dev/null
+++ b/src/mongo/db/exec/SConscript
@@ -0,0 +1,51 @@
+# -*- mode: python -*-
+
+Import("env")
+
+env.StaticLibrary(
+ target = "working_set",
+ source = [
+ "working_set.cpp",
+ ],
+ LIBDEPS = [
+ "$BUILD_DIR/mongo/bson",
+ ],
+)
+
+env.CppUnitTest(
+ target = "working_set_test",
+ source = [
+ "working_set_test.cpp"
+ ],
+ LIBDEPS = [
+ "working_set",
+ ],
+)
+
+env.StaticLibrary(
+ target = "mock_stage",
+ source = [
+ "mock_stage.cpp",
+ ],
+ LIBDEPS = [
+ "working_set",
+ ],
+)
+
+env.StaticLibrary(
+ target = 'exec',
+ source = [
+ "and_hash.cpp",
+ "and_sorted.cpp",
+ "fetch.cpp",
+ "index_scan.cpp",
+ "limit.cpp",
+ "or.cpp",
+ "skip.cpp",
+ "stagedebug_cmd.cpp",
+ "working_set_common.cpp",
+ ],
+ LIBDEPS = [
+ "$BUILD_DIR/mongo/bson"
+ ],
+)
diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp
new file mode 100644
index 00000000000..d6f81c04693
--- /dev/null
+++ b/src/mongo/db/exec/fetch.cpp
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/fetch.h"
+
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/pdfile.h"
+#include "mongo/util/fail_point_service.h"
+
+namespace mongo {
+
+ // Some fail points for testing.
+ MONGO_FP_DECLARE(fetchInMemoryFail);
+ MONGO_FP_DECLARE(fetchInMemorySucceed);
+
+ FetchStage::FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher)
+ : _ws(ws), _child(child), _matcher(matcher), _idBeingPagedIn(WorkingSet::INVALID_ID) { }
+
+ FetchStage::~FetchStage() { }
+
+ bool FetchStage::isEOF() {
+ if (WorkingSet::INVALID_ID != _idBeingPagedIn) {
+ // We asked our parent for a page-in but he didn't get back to us. We still need to
+ // return the result that _idBeingPagedIn refers to.
+ return false;
+ }
+
+ return _child->isEOF();
+ }
+
+ bool recordInMemory(const char* data) {
+ if (MONGO_FAIL_POINT(fetchInMemoryFail)) {
+ return false;
+ }
+
+ if (MONGO_FAIL_POINT(fetchInMemorySucceed)) {
+ return true;
+ }
+
+ return Record::likelyInPhysicalMemory(data);
+ }
+
+ PlanStage::StageState FetchStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ // If we asked our parent for a page-in last time work(...) was called, finish the fetch.
+ if (WorkingSet::INVALID_ID != _idBeingPagedIn) {
+ return fetchCompleted(out);
+ }
+
+ // If we're here, we're not waiting for a DiskLoc to be fetched. Get another to-be-fetched
+ // result from our child.
+ WorkingSetID id;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ WorkingSetMember* member = _ws->get(id);
+
+ // If there's an obj there, there is no fetching to perform.
+ if (member->hasObj()) {
+ return returnIfMatches(member, id, out);
+ }
+
+ // We need a valid loc to fetch from and this is the only state that has one.
+ verify(WorkingSetMember::LOC_AND_IDX == member->state);
+ verify(member->hasLoc());
+
+ Record* record = member->loc.rec();
+ const char* data = record->dataNoThrowing();
+
+ if (!recordInMemory(data)) {
+ // member->loc points to a record that's NOT in memory. Pass a fetch request up.
+ verify(WorkingSet::INVALID_ID == _idBeingPagedIn);
+ _idBeingPagedIn = id;
+ *out = id;
+ return PlanStage::NEED_FETCH;
+ }
+ else {
+ // Don't need index data anymore as we have an obj.
+ member->keyData.clear();
+ member->obj = BSONObj(data);
+ member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+ return returnIfMatches(member, id, out);
+ }
+ }
+ else {
+ // NEED_TIME/YIELD, ERROR, IS_EOF
+ return status;
+ }
+ }
+
+ void FetchStage::prepareToYield() { _child->prepareToYield(); }
+
+ void FetchStage::recoverFromYield() { _child->recoverFromYield(); }
+
+ void FetchStage::invalidate(const DiskLoc& dl) {
+ _child->invalidate(dl);
+
+ // If we're holding on to an object that we're waiting for the runner to page in...
+ if (WorkingSet::INVALID_ID != _idBeingPagedIn) {
+ WorkingSetMember* member = _ws->get(_idBeingPagedIn);
+ verify(member->hasLoc());
+ // The DiskLoc is about to perish so we force a fetch of the data.
+ if (member->loc == dl) {
+ // This is a fetch inside of a write lock (that somebody else holds) but the other
+ // holder is likely operating on this object so this shouldn't have to hit disk.
+ WorkingSetCommon::fetchAndInvalidateLoc(member);
+ }
+ }
+ }
+
+ PlanStage::StageState FetchStage::fetchCompleted(WorkingSetID* out) {
+ WorkingSetMember* member = _ws->get(_idBeingPagedIn);
+
+ // The DiskLoc we're waiting to page in was invalidated (forced fetch). Test for
+ // matching and maybe pass it up.
+ if (member->state == WorkingSetMember::OWNED_OBJ) {
+ WorkingSetID memberID = _idBeingPagedIn;
+ _idBeingPagedIn = WorkingSet::INVALID_ID;
+ return returnIfMatches(member, memberID, out);
+ }
+
+ // Assume that the caller has fetched appropriately.
+ // TODO: Do we want to double-check the runner? Not sure how reliable likelyInMemory is
+ // on all platforms.
+ verify(member->hasLoc());
+ verify(!member->hasObj());
+
+ // Make the (unowned) object.
+ Record* record = member->loc.rec();
+ const char* data = record->dataNoThrowing();
+ member->obj = BSONObj(data);
+
+ // Don't need index data anymore as we have an obj.
+ member->keyData.clear();
+ member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+ verify(!member->obj.isOwned());
+
+ // Return the obj if it passes our filter.
+ WorkingSetID memberID = _idBeingPagedIn;
+ _idBeingPagedIn = WorkingSet::INVALID_ID;
+ return returnIfMatches(member, memberID, out);
+ }
+
+ PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member,
+ WorkingSetID memberID,
+ WorkingSetID* out) {
+ if (NULL == _matcher || _matcher->matches(member)) {
+ *out = memberID;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ _ws->free(memberID);
+ return PlanStage::NEED_TIME;
+ }
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h
new file mode 100644
index 00000000000..67b6c026d43
--- /dev/null
+++ b/src/mongo/db/exec/fetch.h
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/matcher.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+
+ /**
+ * This stage turns a DiskLoc into a BSONObj.
+ *
+ * In WorkingSetMember terms, it transitions from LOC_AND_IDX to LOC_AND_UNOWNED_OBJ by reading
+ * the record at the provided loc. Returns verbatim any data that already has an object.
+ *
+ * Preconditions: Valid DiskLoc.
+ */
+ class FetchStage : public PlanStage {
+ public:
+ FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher);
+ virtual ~FetchStage();
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ /**
+ * If the member (with id memberID) passes our filter, set *out to memberID and return that
+ * ADVANCED. Otherwise, free memberID and return NEED_TIME.
+ */
+ StageState returnIfMatches(WorkingSetMember* member, WorkingSetID memberID,
+ WorkingSetID* out);
+
+ /**
+ * work(...) delegates to this when we're called after requesting a fetch.
+ */
+ StageState fetchCompleted(WorkingSetID* out);
+
+ // _ws is not owned by us.
+ WorkingSet* _ws;
+ scoped_ptr<PlanStage> _child;
+ scoped_ptr<Matcher> _matcher;
+
+ // If we're fetching a DiskLoc and it points at something that's not in memory, we return a
+ // a "please page this in" result and hold on to the WSID until the next call to work(...).
+ WorkingSetID _idBeingPagedIn;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/limit.cpp b/src/mongo/db/exec/limit.cpp
new file mode 100644
index 00000000000..9f394e6f508
--- /dev/null
+++ b/src/mongo/db/exec/limit.cpp
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/limit.h"
+
+namespace mongo {
+
+ LimitStage::LimitStage(int limit, WorkingSet* ws, PlanStage* child)
+ : _ws(ws), _child(child), _numToReturn(limit) { }
+
+ LimitStage::~LimitStage() { }
+
+ bool LimitStage::isEOF() { return (0 == _numToReturn) || _child->isEOF(); }
+
+ PlanStage::StageState LimitStage::work(WorkingSetID* out) {
+ // If we've returned as many results as we're limited to, isEOF will be true.
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ WorkingSetID id;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ *out = id;
+ --_numToReturn;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ // NEED_TIME/YIELD, ERROR, IS_EOF
+ return status;
+ }
+ }
+
+ void LimitStage::prepareToYield() { _child->prepareToYield(); }
+
+ void LimitStage::recoverFromYield() { _child->recoverFromYield(); }
+
+ void LimitStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/limit.h b/src/mongo/db/exec/limit.h
new file mode 100644
index 00000000000..eb674d8deed
--- /dev/null
+++ b/src/mongo/db/exec/limit.h
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+
+ /**
+ * This stage implements limit functionality. It only returns 'limit' results before EOF.
+ *
+ * Sort has a baked-in limit, as it can optimize the sort if it has a limit.
+ *
+ * Preconditions: None.
+ */
+ class LimitStage : public PlanStage {
+ public:
+ LimitStage(int limit, WorkingSet* ws, PlanStage* child);
+ virtual ~LimitStage();
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ WorkingSet* _ws;
+ scoped_ptr<PlanStage> _child;
+
+ // We only return this many results.
+ int _numToReturn;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/mock_stage.cpp b/src/mongo/db/exec/mock_stage.cpp
new file mode 100644
index 00000000000..128c93e1847
--- /dev/null
+++ b/src/mongo/db/exec/mock_stage.cpp
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/mock_stage.h"
+
+namespace mongo {
+
+ MockStage::MockStage(WorkingSet* ws) : _ws(ws) { }
+
+ PlanStage::StageState MockStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ StageState state = _results.front();
+ _results.pop();
+
+ if (PlanStage::ADVANCED == state) {
+ // We advanced. Put the mock obj into the working set.
+ WorkingSetID id = _ws->allocate();
+ WorkingSetMember* member = _ws->get(id);
+ *member = _members.front();
+ _members.pop();
+ *out = id;
+ }
+
+ return state;
+ }
+
+ bool MockStage::isEOF() { return _results.empty(); }
+
+ void MockStage::pushBack(const PlanStage::StageState state) {
+ _results.push(state);
+ }
+
+ void MockStage::pushBack(const WorkingSetMember& member) {
+ _results.push(PlanStage::ADVANCED);
+ _members.push(member);
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/mock_stage.h b/src/mongo/db/exec/mock_stage.h
new file mode 100644
index 00000000000..49b44fccf82
--- /dev/null
+++ b/src/mongo/db/exec/mock_stage.h
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <queue>
+
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/exec/working_set.h"
+
+namespace mongo {
+
+ class DiskLoc;
+
+ /**
+ * MockStage is a data-producing stage that is used for testing. Unlike the other two leaf
+ * stages (CollectionScan and IndexScan) MockStage does not require any underlying storage
+ * layer.
+ *
+ * A MockStage is "programmed" by pushing return values from work() onto its internal queue.
+ * Calls to MockStage::work() pop values off that queue and return them in FIFO order,
+ * annotating the working set with data when appropriate.
+ */
+ class MockStage : public PlanStage {
+ public:
+ MockStage(WorkingSet* ws);
+ virtual ~MockStage() { }
+
+ virtual StageState work(WorkingSetID* out);
+
+ virtual bool isEOF();
+
+ // These don't really mean anything here.
+ // Some day we could count the # of calls to the yield functions to check that other stages
+ // have correct yielding behavior.
+ virtual void prepareToYield() { }
+ virtual void recoverFromYield() { }
+ virtual void invalidate(const DiskLoc& dl) { }
+
+ /**
+ * Add a result to the back of the queue. work() goes through the queue.
+ * Either no data is returned (just a state), or...
+ */
+ void pushBack(const PlanStage::StageState state);
+
+ /**
+ * ...data is returned (and we ADVANCED)
+ */
+ void pushBack(const WorkingSetMember& member);
+
+ private:
+ // We don't own this.
+ WorkingSet* _ws;
+
+ // The data we return.
+ queue<PlanStage::StageState> _results;
+ queue<WorkingSetMember> _members;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/or.cpp b/src/mongo/db/exec/or.cpp
new file mode 100644
index 00000000000..a0a678e2b95
--- /dev/null
+++ b/src/mongo/db/exec/or.cpp
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "mongo/db/exec/or.h"
+
+namespace mongo {
+
+ OrStage::OrStage(WorkingSet* ws, bool dedup, Matcher* matcher)
+ : _ws(ws), _matcher(matcher), _currentChild(0), _dedup(dedup) { }
+
+ OrStage::~OrStage() {
+ for (size_t i = 0; i < _children.size(); ++i) {
+ delete _children[i];
+ }
+ }
+
+ void OrStage::addChild(PlanStage* child) { _children.push_back(child); }
+
+ bool OrStage::isEOF() { return _currentChild >= _children.size(); }
+
+ PlanStage::StageState OrStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ WorkingSetID id;
+ StageState childStatus = _children[_currentChild]->work(&id);
+
+ if (PlanStage::ADVANCED == childStatus) {
+ WorkingSetMember* member = _ws->get(id);
+ verify(member->hasLoc());
+
+ // If we're deduping...
+ if (_dedup) {
+ // ...and we've seen the DiskLoc before
+ if (_seen.end() != _seen.find(member->loc)) {
+ // ...drop it.
+ _ws->free(id);
+ return PlanStage::NEED_TIME;
+ }
+ else {
+ // Otherwise, note that we've seen it.
+ _seen.insert(member->loc);
+ }
+ }
+
+ if (NULL == _matcher || _matcher->matches(member)) {
+ // Match! return it.
+ *out = id;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ // Does not match, try again.
+ _ws->free(id);
+ return PlanStage::NEED_TIME;
+ }
+ }
+ else if (PlanStage::IS_EOF == childStatus) {
+ // Done with _currentChild, move to the next one.
+ ++_currentChild;
+
+ // Maybe we're out of children.
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
+ }
+ else {
+ return PlanStage::NEED_TIME;
+ }
+ }
+ else {
+ // NEED_TIME, ERROR, NEED_YIELD, pass them up.
+ return childStatus;
+ }
+ }
+
+ void OrStage::prepareToYield() {
+ for (size_t i = 0; i < _children.size(); ++i) {
+ _children[i]->prepareToYield();
+ }
+ }
+
+ void OrStage::recoverFromYield() {
+ for (size_t i = 0; i < _children.size(); ++i) {
+ _children[i]->recoverFromYield();
+ }
+ }
+
+ void OrStage::invalidate(const DiskLoc& dl) {
+ if (isEOF()) { return; }
+
+ for (size_t i = 0; i < _children.size(); ++i) {
+ _children[i]->invalidate(dl);
+ }
+
+ // If we see DL again it is not the same record as it once was so we still want to
+ // return it.
+ if (_dedup) { _seen.erase(dl); }
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/or.h b/src/mongo/db/exec/or.h
new file mode 100644
index 00000000000..8df29a0c00b
--- /dev/null
+++ b/src/mongo/db/exec/or.h
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/matcher.h"
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/platform/unordered_set.h"
+
+namespace mongo {
+
+ /**
+ * This stage outputs the union of its children. It optionally deduplicates on DiskLoc.
+ *
+ * Preconditions: Valid DiskLoc.
+ *
+ * If we're deduping, we may fail to dedup any invalidated DiskLoc properly.
+ */
+ class OrStage : public PlanStage {
+ public:
+ OrStage(WorkingSet* ws, bool dedup, Matcher* matcher);
+ virtual ~OrStage();
+
+ void addChild(PlanStage* child);
+
+ virtual bool isEOF();
+
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ scoped_ptr<Matcher> _matcher;
+
+ // Owned by us.
+ vector<PlanStage*> _children;
+
+ // Which of _children are we calling work(...) on now?
+ size_t _currentChild;
+
+ // True if we dedup on DiskLoc, false otherwise.
+ bool _dedup;
+
+ // Which DiskLocs have we returned?
+ unordered_set<DiskLoc, DiskLoc::Hasher> _seen;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h
index c1a232b3b2c..0532ef1cecf 100644
--- a/src/mongo/db/exec/plan_stage.h
+++ b/src/mongo/db/exec/plan_stage.h
@@ -94,16 +94,33 @@ namespace mongo {
* All possible return values of work(...)
*/
enum StageState {
- // work(...) has returned a new result in its out parameter.
+ // work(...) has returned a new result in its out parameter. The caller must free it
+ // from the working set when done with it.
ADVANCED,
- // work(...) won't do anything more. isEOF() will also be true.
+
+ // work(...) won't do anything more. isEOF() will also be true. There is nothing
+ // output in the out parameter.
IS_EOF,
- // work(...) needs more time to product a result. Call work(...) again.
+
+ // work(...) needs more time to product a result. Call work(...) again. There is
+ // nothing output in the out parameter.
NEED_TIME,
- // Something has gone unrecoverably wrong. Stop running this query.
+
+ // Something has gone unrecoverably wrong. Stop running this query. There is nothing
+ // output in the out parameter.
FAILURE,
- // Something isn't in memory. Fetch it. TODO: actually support this (forthcoming).
- NEED_YIELD,
+
+ // Something isn't in memory. Fetch it.
+ //
+ // Full fetch semantics:
+ // The fetch-requesting stage populates the out parameter of work(...) with a WSID that
+ // refers to a WSM with a valid loc. Each stage that receives a NEED_FETCH from a child
+ // must propagate the NEED_FETCH up and perform no work. The plan runner is responsible
+ // for paging in the data upon receipt of a NEED_FETCH. The plan runner does NOT free
+ // the WSID of the requested fetch. The stage that requested the fetch holds the WSID
+ // of the loc it wants fetched. On the next call to work() that stage can assume a
+ // fetch was performed on the WSM that the held WSID refers to.
+ NEED_FETCH,
};
/**
diff --git a/src/mongo/db/exec/simple_plan_runner.h b/src/mongo/db/exec/simple_plan_runner.h
index 34ca0b2f29a..797c3c9c9ec 100644
--- a/src/mongo/db/exec/simple_plan_runner.h
+++ b/src/mongo/db/exec/simple_plan_runner.h
@@ -17,6 +17,7 @@
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/pdfile.h"
namespace mongo {
@@ -26,7 +27,6 @@ namespace mongo {
*
* TODO: Yielding policy
* TODO: Graceful error handling
- * TODO: Fetch
* TODO: Stats, diagnostics, instrumentation, etc.
*/
class SimplePlanRunner {
@@ -59,8 +59,31 @@ namespace mongo {
else if (code == PlanStage::NEED_TIME) {
// TODO: Occasionally yield. For now, we run until we get another result.
}
+ else if (PlanStage::NEED_FETCH == code) {
+ // id has a loc and refers to an obj we need to fetch.
+ WorkingSetMember* member = _workingSet.get(id);
+
+ // This must be true for somebody to request a fetch and can only change when an
+ // invalidation happens, which is when we give up a lock. Don't give up the
+ // lock between receiving the NEED_FETCH and actually fetching(?).
+ verify(member->hasLoc());
+
+ // Actually bring record into memory.
+ Record* record = member->loc.rec();
+ record->touch();
+
+ // Record should be in memory now. Log if it's not.
+ if (!Record::likelyInPhysicalMemory(record->dataNoThrowing())) {
+ OCCASIONALLY {
+ warning() << "Record wasn't in memory immediately after fetch: "
+ << member->loc.toString() << endl;
+ }
+ }
+
+ // Note that we're not freeing id. Fetch semantics say that we shouldn't.
+ }
else {
- // IS_EOF, FAILURE, NEED_YIELD. We just stop here.
+ // IS_EOF, FAILURE. We just stop here.
return false;
}
}
diff --git a/src/mongo/db/exec/skip.cpp b/src/mongo/db/exec/skip.cpp
new file mode 100644
index 00000000000..8dfb4e33df3
--- /dev/null
+++ b/src/mongo/db/exec/skip.cpp
@@ -0,0 +1,58 @@
+/**
+* Copyright (C) 2013 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "mongo/db/exec/skip.h"
+
+namespace mongo {
+
+ SkipStage::SkipStage(int toSkip, WorkingSet* ws, PlanStage* child)
+ : _ws(ws), _child(child), _toSkip(toSkip) { }
+
+ SkipStage::~SkipStage() { }
+
+ bool SkipStage::isEOF() { return _child->isEOF(); }
+
+ PlanStage::StageState SkipStage::work(WorkingSetID* out) {
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ WorkingSetID id;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ // If we're still skipping results...
+ if (_toSkip > 0) {
+ // ...drop the result.
+ --_toSkip;
+ _ws->free(id);
+ return PlanStage::NEED_TIME;
+ }
+
+ *out = id;
+ return PlanStage::ADVANCED;
+ }
+ else {
+ // NEED_TIME/YIELD, ERROR, IS_EOF
+ return status;
+ }
+ }
+
+ void SkipStage::prepareToYield() { _child->prepareToYield(); }
+
+ void SkipStage::recoverFromYield() { _child->recoverFromYield(); }
+
+ void SkipStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/skip.h b/src/mongo/db/exec/skip.h
new file mode 100644
index 00000000000..5086e9d66aa
--- /dev/null
+++ b/src/mongo/db/exec/skip.h
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mongo/db/diskloc.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+
+ /**
+ * This stage implements skip functionality. It drops the first 'toSkip' results from its child
+ * then returns the rest verbatim.
+ *
+ * Preconditions: None.
+ */
+ class SkipStage : public PlanStage {
+ public:
+ SkipStage(int toSkip, WorkingSet* ws, PlanStage* child);
+ virtual ~SkipStage();
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void prepareToYield();
+ virtual void recoverFromYield();
+ virtual void invalidate(const DiskLoc& dl);
+
+ private:
+ WorkingSet* _ws;
+ scoped_ptr<PlanStage> _child;
+
+ // We drop the first _toSkip results that we would have returned.
+ int _toSkip;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp
index e15e8a96489..d55a9dd497b 100644
--- a/src/mongo/db/exec/stagedebug_cmd.cpp
+++ b/src/mongo/db/exec/stagedebug_cmd.cpp
@@ -20,7 +20,11 @@
#include "mongo/db/commands.h"
#include "mongo/db/exec/and_hash.h"
#include "mongo/db/exec/and_sorted.h"
+#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/index_scan.h"
+#include "mongo/db/exec/limit.h"
+#include "mongo/db/exec/or.h"
+#include "mongo/db/exec/skip.h"
#include "mongo/db/exec/simple_plan_runner.h"
#include "mongo/db/index/catalog_hack.h"
#include "mongo/db/jsobj.h"
@@ -49,16 +53,16 @@ namespace mongo {
*
* node -> {andHash: {filter: {filter}, args: { nodes: [node, node]}}}
* node -> {andSorted: {filter: {filter}, args: { nodes: [node, node]}}}
+ * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}}
+ * node -> {fetch: {filter: {filter}, args: {node: node}}}
+ * node -> {limit: {args: {node: node, num: posint}}}
+ * node -> {skip: {args: {node: node, num: posint}}}
*
* Forthcoming Nodes:
*
* node -> {cscan: {filter: {filter}, args: {name: "collectionname" }}}
- * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}}
- * node -> {fetch: {filter: {filter}, args: {node: node}}}
* node -> {sort: {filter: {filter}, args: {node: node, pattern: objWithSortCriterion}}}
* node -> {dedup: {filter: {filter}, args: {node: node, field: field}}}
- * node -> {limit: {filter: filter}, args: {node: node, num: posint}}
- * node -> {skip: {filter: filter}, args: {node: node, num: posint}}
* node -> {unwind: {filter: filter}, args: {node: node, field: field}}
*/
class StageDebugCmd : public Command {
@@ -203,6 +207,52 @@ namespace mongo {
return andStage.release();
}
+ else if ("or" == nodeName) {
+ uassert(16934, "Nodes argument must be provided to AND",
+ nodeArgs["nodes"].isABSONObj());
+ uassert(16935, "Dedup argument must be provided to OR",
+ !nodeArgs["dedup"].eoo());
+ BSONObjIterator it(nodeArgs["nodes"].Obj());
+ auto_ptr<OrStage> orStage(new OrStage(workingSet, nodeArgs["dedup"].Bool(),
+ matcher.release()));
+ while (it.more()) {
+ BSONElement e = it.next();
+ if (!e.isABSONObj()) { return NULL; }
+ PlanStage* subNode = parseQuery(dbname, e.Obj(), workingSet);
+ uassert(16936, "Can't parse sub-node of OR: " + e.Obj().toString(),
+ NULL != subNode);
+ // takes ownership
+ orStage->addChild(subNode);
+ }
+
+ return orStage.release();
+ }
+ else if ("fetch" == nodeName) {
+ uassert(16929, "Node argument must be provided to fetch",
+ nodeArgs["node"].isABSONObj());
+ PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet);
+ return new FetchStage(workingSet, subNode, matcher.release());
+ }
+ else if ("limit" == nodeName) {
+ uassert(16937, "Limit stage doesn't have a filter (put it on the child)",
+ NULL == matcher.get());
+ uassert(16930, "Node argument must be provided to limit",
+ nodeArgs["node"].isABSONObj());
+ uassert(16931, "Num argument must be provided to limit",
+ nodeArgs["num"].isNumber());
+ PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet);
+ return new LimitStage(nodeArgs["num"].numberInt(), workingSet, subNode);
+ }
+ else if ("skip" == nodeName) {
+ uassert(16938, "Skip stage doesn't have a filter (put it on the child)",
+ NULL == matcher.get());
+ uassert(16932, "Node argument must be provided to skip",
+ nodeArgs["node"].isABSONObj());
+ uassert(16933, "Num argument must be provided to skip",
+ nodeArgs["num"].isNumber());
+ PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet);
+ return new SkipStage(nodeArgs["num"].numberInt(), workingSet, subNode);
+ }
else {
return NULL;
}
diff --git a/src/mongo/db/matcher/matcher.cpp b/src/mongo/db/matcher/matcher.cpp
index 8cb4a0b4286..be47ae91624 100644
--- a/src/mongo/db/matcher/matcher.cpp
+++ b/src/mongo/db/matcher/matcher.cpp
@@ -19,7 +19,6 @@
#include "mongo/pch.h"
#include "mongo/base/init.h"
-#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/matcher/matcher.h"
diff --git a/src/mongo/db/pdfile.h b/src/mongo/db/pdfile.h
index b1ff2c118a0..68fd32d0f89 100644
--- a/src/mongo/db/pdfile.h
+++ b/src/mongo/db/pdfile.h
@@ -259,6 +259,9 @@ namespace mongo {
const char * data() const { _accessing(); return _data; }
char * data() { _accessing(); return _data; }
+ const char * dataNoThrowing() const { return _data; }
+ char * dataNoThrowing() { return _data; }
+
int netLength() const { _accessing(); return _netLength(); }
/* use this when a record is deleted. basically a union with next/prev fields */
diff --git a/src/mongo/dbtests/query_stage_fetch.cpp b/src/mongo/dbtests/query_stage_fetch.cpp
new file mode 100644
index 00000000000..ab34f2823fb
--- /dev/null
+++ b/src/mongo/dbtests/query_stage_fetch.cpp
@@ -0,0 +1,389 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * This file tests db/exec/fetch.cpp. Fetch goes to disk so we cannot test outside of a dbtest.
+ */
+
+#include "mongo/client/dbclientcursor.h"
+#include "mongo/db/cursor.h"
+#include "mongo/db/exec/fetch.h"
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/exec/mock_stage.h"
+#include "mongo/db/instance.h"
+#include "mongo/db/json.h"
+#include "mongo/db/matcher.h"
+#include "mongo/db/pdfile.h"
+#include "mongo/dbtests/dbtests.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/fail_point_registry.h"
+#include "mongo/util/fail_point_service.h"
+
+namespace QueryStageFetch {
+
+ class QueryStageFetchBase {
+ public:
+ QueryStageFetchBase() { }
+
+ virtual ~QueryStageFetchBase() {
+ _client.dropCollection(ns());
+ }
+
+ void getLocs(set<DiskLoc>* out) {
+ for (shared_ptr<Cursor> c = theDataFileMgr.findAll(ns()); c->ok(); c->advance()) {
+ out->insert(c->currLoc());
+ }
+ }
+
+ void insert(const BSONObj& obj) {
+ _client.insert(ns(), obj);
+ }
+
+ void remove(const BSONObj& obj) {
+ _client.remove(ns(), obj);
+ }
+
+ static const char* ns() { return "unittests.QueryStageFetch"; }
+
+ private:
+ static DBDirectClient _client;
+ };
+
+ DBDirectClient QueryStageFetchBase::_client;
+
+ //
+ // Test that a fetch is passed up when it's not in memory.
+ //
+ class FetchStageNotInMemory : public QueryStageFetchBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ WorkingSet ws;
+
+ // Add an object to the DB.
+ insert(BSON("foo" << 5));
+ set<DiskLoc> locs;
+ getLocs(&locs);
+ ASSERT_EQUALS(size_t(1), locs.size());
+
+ // Create a mock stage that returns the WSM.
+ auto_ptr<MockStage> mockStage(new MockStage(&ws));
+
+ // Mock data.
+ {
+ WorkingSetMember mockMember;
+ mockMember.state = WorkingSetMember::LOC_AND_IDX;
+ mockMember.loc = *locs.begin();
+
+ // State is loc and index, shouldn't be able to get the foo data inside.
+ BSONElement elt;
+ ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt));
+ mockStage->pushBack(mockMember);
+ }
+
+ auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL));
+
+ // Set the fail point to return not in memory.
+ FailPointRegistry* reg = getGlobalFailPointRegistry();
+ FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail");
+ fetchInMemoryFail->setMode(FailPoint::alwaysOn);
+
+ // First call should return a fetch request as it's not in memory.
+ WorkingSetID id;
+ PlanStage::StageState state;
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::NEED_FETCH, state);
+
+ // Let's do the fetch ourselves (though it doesn't really matter)
+ WorkingSetMember* member = ws.get(id);
+ ASSERT_FALSE(member->hasObj());
+ member->loc.rec()->touch();
+
+ // Next call to work() should give us the object in a diff. state
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::ADVANCED, state);
+ ASSERT_EQUALS(WorkingSetMember::LOC_AND_UNOWNED_OBJ, member->state);
+
+ // We should be able to get data from the obj now.
+ BSONElement elt;
+ ASSERT_TRUE(member->getFieldDotted("foo", &elt));
+ ASSERT_EQUALS(elt.numberInt(), 5);
+
+ // Mock stage is EOF so fetch should be too.
+ ASSERT_TRUE(fetchStage->isEOF());
+
+ // Turn off fail point for further tests.
+ fetchInMemoryFail->setMode(FailPoint::off);
+ }
+ };
+
+ //
+ // Test that a fetch is not passed up when it's in memory.
+ //
+ class FetchStageInMemory : public QueryStageFetchBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ WorkingSet ws;
+
+ // Add an object to the DB.
+ insert(BSON("foo" << 5));
+ set<DiskLoc> locs;
+ getLocs(&locs);
+ ASSERT_EQUALS(size_t(1), locs.size());
+
+ // Create a mock stage that returns the WSM.
+ auto_ptr<MockStage> mockStage(new MockStage(&ws));
+
+ // Mock data.
+ {
+ WorkingSetMember mockMember;
+ mockMember.state = WorkingSetMember::LOC_AND_IDX;
+ mockMember.loc = *locs.begin();
+
+ // State is loc and index, shouldn't be able to get the foo data inside.
+ BSONElement elt;
+ ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt));
+ mockStage->pushBack(mockMember);
+ }
+
+ auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL));
+
+ // Set the fail point to return in memory.
+ FailPointRegistry* reg = getGlobalFailPointRegistry();
+ FailPoint* fetchInMemorySucceed = reg->getFailPoint("fetchInMemorySucceed");
+ fetchInMemorySucceed->setMode(FailPoint::alwaysOn);
+
+ // First call fetches as expected.
+ WorkingSetID id;
+ PlanStage::StageState state;
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::ADVANCED, state);
+
+ // State should have changed.
+ WorkingSetMember* member = ws.get(id);
+ ASSERT_EQUALS(WorkingSetMember::LOC_AND_UNOWNED_OBJ, member->state);
+
+ // We should be able to get data from the obj now.
+ BSONElement elt;
+ ASSERT_TRUE(member->getFieldDotted("foo", &elt));
+ ASSERT_EQUALS(elt.numberInt(), 5);
+
+ // Mock stage is EOF so fetch should be too.
+ ASSERT_TRUE(fetchStage->isEOF());
+
+ // Turn off fail point for further tests.
+ fetchInMemorySucceed->setMode(FailPoint::off);
+ }
+ };
+
+ //
+ // Test mid-fetch invalidation.
+ //
+ class FetchStageInvalidation : public QueryStageFetchBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ WorkingSet ws;
+
+ // Add an object to the DB.
+ insert(BSON("foo" << 5));
+ set<DiskLoc> locs;
+ getLocs(&locs);
+ ASSERT_EQUALS(size_t(1), locs.size());
+
+ // Create a mock stage that returns the WSM.
+ auto_ptr<MockStage> mockStage(new MockStage(&ws));
+
+ // Mock data.
+ {
+ WorkingSetMember mockMember;
+ mockMember.state = WorkingSetMember::LOC_AND_IDX;
+ mockMember.loc = *locs.begin();
+
+ // State is loc and index, shouldn't be able to get the foo data inside.
+ BSONElement elt;
+ ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt));
+ mockStage->pushBack(mockMember);
+ }
+
+ auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL));
+
+ // Set the fail point to return not in memory.
+ FailPointRegistry* reg = getGlobalFailPointRegistry();
+ FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail");
+ fetchInMemoryFail->setMode(FailPoint::alwaysOn);
+
+ // First call should return a fetch request as it's not in memory.
+ WorkingSetID id;
+ PlanStage::StageState state;
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::NEED_FETCH, state);
+
+ WorkingSetMember* member = ws.get(id);
+
+ // Invalidate the DL.
+ fetchStage->invalidate(member->loc);
+
+ // Next call to work() should give us the OWNED obj as it was invalidated mid-page-in.
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::ADVANCED, state);
+ ASSERT_EQUALS(WorkingSetMember::OWNED_OBJ, member->state);
+
+ // We should be able to get data from the obj now.
+ BSONElement elt;
+ ASSERT_TRUE(member->getFieldDotted("foo", &elt));
+ ASSERT_EQUALS(elt.numberInt(), 5);
+
+ // Mock stage is EOF so fetch should be too.
+ ASSERT_TRUE(fetchStage->isEOF());
+
+ // Turn off fail point for further tests.
+ fetchInMemoryFail->setMode(FailPoint::off);
+ }
+ };
+
+
+ //
+ // Test that a WSM with an obj is passed through verbatim.
+ //
+ class FetchStageAlreadyFetched : public QueryStageFetchBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ WorkingSet ws;
+
+ // Add an object to the DB.
+ insert(BSON("foo" << 5));
+ set<DiskLoc> locs;
+ getLocs(&locs);
+ ASSERT_EQUALS(size_t(1), locs.size());
+
+ // Create a mock stage that returns the WSM.
+ auto_ptr<MockStage> mockStage(new MockStage(&ws));
+
+ // Mock data.
+ {
+ WorkingSetMember mockMember;
+ mockMember.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+ mockMember.loc = *locs.begin();
+ mockMember.obj = mockMember.loc.obj();
+ // Points into our DB.
+ ASSERT_FALSE(mockMember.obj.isOwned());
+ mockStage->pushBack(mockMember);
+
+ mockMember.state = WorkingSetMember::OWNED_OBJ;
+ mockMember.loc = DiskLoc();
+ mockMember.obj = BSON("foo" << 6);
+ ASSERT_TRUE(mockMember.obj.isOwned());
+ mockStage->pushBack(mockMember);
+ }
+
+ auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL));
+
+ // Set the fail point to return not in memory so we get a fetch request.
+ FailPointRegistry* reg = getGlobalFailPointRegistry();
+ FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail");
+ fetchInMemoryFail->setMode(FailPoint::alwaysOn);
+
+ WorkingSetID id;
+ PlanStage::StageState state;
+
+ // Don't bother doing any fetching if an obj exists already.
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::ADVANCED, state);
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::ADVANCED, state);
+
+ // No more data to fetch, so, EOF.
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::IS_EOF, state);
+
+ fetchInMemoryFail->setMode(FailPoint::off);
+ }
+ };
+
+ //
+ // Test matching with fetch.
+ //
+ class FetchStageFilter : public QueryStageFetchBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ WorkingSet ws;
+
+ // Add an object to the DB.
+ insert(BSON("foo" << 5));
+ set<DiskLoc> locs;
+ getLocs(&locs);
+ ASSERT_EQUALS(size_t(1), locs.size());
+
+ // Create a mock stage that returns the WSM.
+ auto_ptr<MockStage> mockStage(new MockStage(&ws));
+
+ // Mock data.
+ {
+ WorkingSetMember mockMember;
+ mockMember.state = WorkingSetMember::LOC_AND_IDX;
+ mockMember.loc = *locs.begin();
+
+ // State is loc and index, shouldn't be able to get the foo data inside.
+ BSONElement elt;
+ ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt));
+ mockStage->pushBack(mockMember);
+ }
+
+ // Matcher requires that foo==6 but we only have data with foo==5.
+ auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(),
+ new Matcher(BSON("foo" << 6))));
+
+ // Set the fail point to return not in memory so we get a fetch request.
+ FailPointRegistry* reg = getGlobalFailPointRegistry();
+ FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail");
+ fetchInMemoryFail->setMode(FailPoint::alwaysOn);
+
+ // First call should return a fetch request as it's not in memory.
+ WorkingSetID id;
+ PlanStage::StageState state;
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::NEED_FETCH, state);
+
+ // Normally we'd return the object but we have a filter that prevents it.
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::NEED_TIME, state);
+
+ // No more data to fetch, so, EOF.
+ state = fetchStage->work(&id);
+ ASSERT_EQUALS(PlanStage::IS_EOF, state);
+
+ fetchInMemoryFail->setMode(FailPoint::off);
+ }
+ };
+
+ class All : public Suite {
+ public:
+ All() : Suite( "query_stage_fetch" ) { }
+
+ void setupTests() {
+ add<FetchStageNotInMemory>();
+ add<FetchStageInMemory>();
+ add<FetchStageAlreadyFetched>();
+ add<FetchStageInvalidation>();
+ add<FetchStageFilter>();
+ }
+ } queryStageFetchAll;
+
+} // namespace QueryStageFetch
diff --git a/src/mongo/dbtests/query_stage_limit_skip.cpp b/src/mongo/dbtests/query_stage_limit_skip.cpp
new file mode 100644
index 00000000000..c752b589889
--- /dev/null
+++ b/src/mongo/dbtests/query_stage_limit_skip.cpp
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2013 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * This file tests db/exec/limit.cpp and db/exec/skip.cpp.
+ */
+
+#include "mongo/client/dbclientcursor.h"
+#include "mongo/db/exec/limit.h"
+#include "mongo/db/exec/mock_stage.h"
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/exec/skip.h"
+#include "mongo/db/instance.h"
+#include "mongo/db/json.h"
+#include "mongo/dbtests/dbtests.h"
+
+using namespace mongo;
+
+namespace {
+
+ static const int N = 50;
+
+ /* Populate a MockStage and return it. Caller owns it. */
+ MockStage* getMS(WorkingSet* ws) {
+ auto_ptr<MockStage> ms(new MockStage(ws));
+
+ // Put N ADVANCED results into the mock stage, and some other stalling results (YIELD/TIME).
+ for (int i = 0; i < N; ++i) {
+ ms->pushBack(PlanStage::NEED_TIME);
+ WorkingSetMember wsm;
+ wsm.state = WorkingSetMember::OWNED_OBJ;
+ wsm.obj = BSON("x" << i);
+ ms->pushBack(wsm);
+ ms->pushBack(PlanStage::NEED_TIME);
+ ms->pushBack(PlanStage::NEED_FETCH);
+ }
+
+ return ms.release();
+ }
+
+ int countResults(PlanStage* stage) {
+ int count = 0;
+ while (!stage->isEOF()) {
+ WorkingSetID id;
+ PlanStage::StageState status = stage->work(&id);
+ if (PlanStage::ADVANCED != status) { continue; }
+ ++count;
+ }
+ return count;
+ }
+
+ //
+ // Insert 50 objects. Filter/skip 0, 1, 2, ..., 100 objects and expect the right # of results.
+ //
+ class LimitSkipBasicTest {
+ public:
+ void run() {
+ for (int i = 0; i < 2 * N; ++i) {
+ WorkingSet ws;
+
+ scoped_ptr<PlanStage> skip(new SkipStage(i, &ws, getMS(&ws)));
+ ASSERT_EQUALS(max(0, N - i), countResults(skip.get()));
+
+ scoped_ptr<PlanStage> limit(new LimitStage(i, &ws, getMS(&ws)));
+ ASSERT_EQUALS(min(N, i), countResults(limit.get()));
+ }
+ }
+ };
+
+ class All : public Suite {
+ public:
+ All() : Suite( "query_stage_limit_skip" ) { }
+
+ void setupTests() {
+ add<LimitSkipBasicTest>();
+ }
+ } queryStageLimitSkipAll;
+
+} // namespace