diff options
author | Hari Khalsa <hkhalsa@10gen.com> | 2013-07-08 14:44:32 -0400 |
---|---|---|
committer | Hari Khalsa <hkhalsa@10gen.com> | 2013-07-09 14:10:54 -0400 |
commit | b8f0ec598013009c56dee527e76429ffa7b8c394 (patch) | |
tree | a7cf31d7fbf6bc525f4b1ded421dad8a97aed9a5 | |
parent | fedd312c424fc5f82f7be1dad13f3dd74403c4a4 (diff) | |
download | mongo-b8f0ec598013009c56dee527e76429ffa7b8c394.tar.gz |
SERVER-10026 fetch limit skip or
-rw-r--r-- | jstests/stages_fetch.js | 33 | ||||
-rw-r--r-- | jstests/stages_limit_skip.js | 34 | ||||
-rw-r--r-- | jstests/stages_or.js | 33 | ||||
-rw-r--r-- | src/mongo/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/exec/SConscript | 51 | ||||
-rw-r--r-- | src/mongo/db/exec/fetch.cpp | 171 | ||||
-rw-r--r-- | src/mongo/db/exec/fetch.h | 69 | ||||
-rw-r--r-- | src/mongo/db/exec/limit.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/exec/limit.h | 52 | ||||
-rw-r--r-- | src/mongo/db/exec/mock_stage.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/exec/mock_stage.h | 73 | ||||
-rw-r--r-- | src/mongo/db/exec/or.cpp | 111 | ||||
-rw-r--r-- | src/mongo/db/exec/or.h | 68 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stage.h | 29 | ||||
-rw-r--r-- | src/mongo/db/exec/simple_plan_runner.h | 27 | ||||
-rw-r--r-- | src/mongo/db/exec/skip.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/exec/skip.h | 51 | ||||
-rw-r--r-- | src/mongo/db/exec/stagedebug_cmd.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/matcher/matcher.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/pdfile.h | 3 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_fetch.cpp | 389 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_limit_skip.cpp | 92 |
22 files changed, 1498 insertions, 23 deletions
diff --git a/jstests/stages_fetch.js b/jstests/stages_fetch.js new file mode 100644 index 00000000000..3e2c01df91a --- /dev/null +++ b/jstests/stages_fetch.js @@ -0,0 +1,33 @@ +// Test basic fetch functionality. +t = db.stages_fetch; +t.drop(); + +var N = 50; +for (var i = 0; i < N; ++i) { + t.insert({foo: i, bar: N - i, baz: i}); +} + +t.ensureIndex({foo: 1}); + +// 20 <= foo <= 30 +// bar == 25 (not covered, should error.) +ixscan1 = {ixscan: {args:{name: "stages_fetch", keyPattern:{foo:1}, + startKey: {"": 20}, + endKey: {"" : 30}, endKeyInclusive: true, + direction: 1}, + filter: {bar: 25}}}; +res = db.runCommand({stageDebug: ixscan1}); +assert(db.getLastError()); +assert.eq(res.ok, 0); + +// Now, add a fetch. We should be able to filter on the non-covered field since we fetched the obj. +ixscan2 = {ixscan: {args:{name: "stages_fetch", keyPattern:{foo:1}, + startKey: {"": 20}, + endKey: {"" : 30}, endKeyInclusive: true, + direction: 1}}} +fetch = {fetch: {args: {node: ixscan2}, filter: {bar: 25}}} +res = db.runCommand({stageDebug: fetch}); +printjson(res); +assert(!db.getLastError()); +assert.eq(res.ok, 1); +assert.eq(res.results.length, 1); diff --git a/jstests/stages_limit_skip.js b/jstests/stages_limit_skip.js new file mode 100644 index 00000000000..9441e4cd65b --- /dev/null +++ b/jstests/stages_limit_skip.js @@ -0,0 +1,34 @@ +// Test limit and skip +t = db.stages_limit_skip; +t.drop(); + +var N = 50; +for (var i = 0; i < N; ++i) { + t.insert({foo: i, bar: N - i, baz: i}); +} + +t.ensureIndex({foo: 1}) + +// foo <= 20, decreasing +// Limit of 5 results. +ixscan1 = {ixscan: {args:{name: "stages_limit_skip", keyPattern:{foo: 1}, + startKey: {"": 20}, + endKey: {}, endKeyInclusive: true, + direction: -1}}}; +limit1 = {limit: {args: {node: ixscan1, num: 5}}} +res = db.runCommand({stageDebug: limit1}); +assert(!db.getLastError()); +assert.eq(res.ok, 1); +assert.eq(res.results.length, 5); +assert.eq(res.results[0].foo, 20); +assert.eq(res.results[4].foo, 16); + +// foo <= 20, decreasing +// Skip 5 results. +skip1 = {skip: {args: {node: ixscan1, num: 5}}} +res = db.runCommand({stageDebug: skip1}); +assert(!db.getLastError()); +assert.eq(res.ok, 1); +assert.eq(res.results.length, 16); +assert.eq(res.results[0].foo, 15); +assert.eq(res.results[res.results.length - 1].foo, 0); diff --git a/jstests/stages_or.js b/jstests/stages_or.js new file mode 100644 index 00000000000..bb0e02b11d4 --- /dev/null +++ b/jstests/stages_or.js @@ -0,0 +1,33 @@ +// Test basic OR functionality +t = db.stages_or; +t.drop(); + +var N = 50; +for (var i = 0; i < N; ++i) { + t.insert({foo: i, bar: N - i, baz: i}); +} + +t.ensureIndex({foo: 1}) +t.ensureIndex({bar: 1}) +t.ensureIndex({baz: 1}) + +// baz >= 40 +ixscan1 = {ixscan: {args:{name: "stages_or", keyPattern:{baz: 1}, + startKey: {"": 40}, endKey: {}, + endKeyInclusive: true, direction: 1}}}; +// foo >= 40 +ixscan2 = {ixscan: {args:{name: "stages_or", keyPattern:{foo: 1}, + startKey: {"": 40}, endKey: {}, + endKeyInclusive: true, direction: 1}}}; + +// OR of baz and foo. Baz == foo and we dedup. +orix1ix2 = {or: {args: {nodes: [ixscan1, ixscan2], dedup:true}}}; +res = db.runCommand({stageDebug: orix1ix2}); +assert.eq(res.ok, 1); +assert.eq(res.results.length, 10); + +// No deduping, 2x the results. +orix1ix2nodd = {or: {args: {nodes: [ixscan1, ixscan2], dedup:false}}}; +res = db.runCommand({stageDebug: orix1ix2nodd}); +assert.eq(res.ok, 1); +assert.eq(res.results.length, 20); diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 2894b605eff..180a8f05971 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -17,6 +17,7 @@ Import("darwin windows solaris linux nix") env.SConscript(['base/SConscript', 'db/auth/SConscript', + 'db/exec/SConscript', 'db/fts/SConscript', 'db/ops/SConscript', 'db/SConscript', @@ -348,7 +349,7 @@ env.StaticLibrary("coredb", [ 'expressions', 'expressions_geo', 'expressions_where', - 'working_set', + 'db/exec/working_set', '$BUILD_DIR/mongo/foundation']) coreServerFiles = [ "db/client_basic.cpp", @@ -463,11 +464,6 @@ serverOnlyFiles = [ "db/curop.cpp", "db/prefetch.cpp", "db/repl/write_concern.cpp", "db/btreecursor.cpp", - "db/exec/and_hash.cpp", - "db/exec/and_sorted.cpp", - "db/exec/index_scan.cpp", - "db/exec/stagedebug_cmd.cpp", - "db/exec/working_set_common.cpp", "db/index_legacy.cpp", "db/index_selection.cpp", "db/index/2d_access_method.cpp", @@ -669,10 +665,6 @@ serverOnlyFiles += [ "s/d_logic.cpp", env.StaticLibrary("defaultversion", "s/default_version.cpp") -# Query execution -env.StaticLibrary("working_set", [ "db/exec/working_set.cpp", ], LIBDEPS = [ "bson",]) -env.CppUnitTest("working_set_test", [ "db/exec/working_set_test.cpp" ], LIBDEPS = ["working_set"]) - # Geo env.StaticLibrary("geometry", [ "db/geo/hash.cpp", "db/geo/shapes.cpp", ], LIBDEPS = [ "bson" ]) env.StaticLibrary("geoparser", [ "db/geo/geoparser.cpp", ], @@ -730,6 +722,7 @@ env.StaticLibrary("serveronly", serverOnlyFiles, 's/metadata', "working_set", "writebatch", + "db/exec/exec", '$BUILD_DIR/third_party/shim_snappy']) @@ -827,6 +820,7 @@ test = testEnv.Install( "gridfs", "s/upgrade", "mocklib", + "db/exec/mock_stage", "$BUILD_DIR/mongo/db/auth/authmocks"])) if len(testEnv.subst('$PROGSUFFIX')): diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript new file mode 100644 index 00000000000..c0606c4ab1d --- /dev/null +++ b/src/mongo/db/exec/SConscript @@ -0,0 +1,51 @@ +# -*- mode: python -*- + +Import("env") + +env.StaticLibrary( + target = "working_set", + source = [ + "working_set.cpp", + ], + LIBDEPS = [ + "$BUILD_DIR/mongo/bson", + ], +) + +env.CppUnitTest( + target = "working_set_test", + source = [ + "working_set_test.cpp" + ], + LIBDEPS = [ + "working_set", + ], +) + +env.StaticLibrary( + target = "mock_stage", + source = [ + "mock_stage.cpp", + ], + LIBDEPS = [ + "working_set", + ], +) + +env.StaticLibrary( + target = 'exec', + source = [ + "and_hash.cpp", + "and_sorted.cpp", + "fetch.cpp", + "index_scan.cpp", + "limit.cpp", + "or.cpp", + "skip.cpp", + "stagedebug_cmd.cpp", + "working_set_common.cpp", + ], + LIBDEPS = [ + "$BUILD_DIR/mongo/bson" + ], +) diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp new file mode 100644 index 00000000000..d6f81c04693 --- /dev/null +++ b/src/mongo/db/exec/fetch.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/fetch.h" + +#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/pdfile.h" +#include "mongo/util/fail_point_service.h" + +namespace mongo { + + // Some fail points for testing. + MONGO_FP_DECLARE(fetchInMemoryFail); + MONGO_FP_DECLARE(fetchInMemorySucceed); + + FetchStage::FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher) + : _ws(ws), _child(child), _matcher(matcher), _idBeingPagedIn(WorkingSet::INVALID_ID) { } + + FetchStage::~FetchStage() { } + + bool FetchStage::isEOF() { + if (WorkingSet::INVALID_ID != _idBeingPagedIn) { + // We asked our parent for a page-in but he didn't get back to us. We still need to + // return the result that _idBeingPagedIn refers to. + return false; + } + + return _child->isEOF(); + } + + bool recordInMemory(const char* data) { + if (MONGO_FAIL_POINT(fetchInMemoryFail)) { + return false; + } + + if (MONGO_FAIL_POINT(fetchInMemorySucceed)) { + return true; + } + + return Record::likelyInPhysicalMemory(data); + } + + PlanStage::StageState FetchStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + // If we asked our parent for a page-in last time work(...) was called, finish the fetch. + if (WorkingSet::INVALID_ID != _idBeingPagedIn) { + return fetchCompleted(out); + } + + // If we're here, we're not waiting for a DiskLoc to be fetched. Get another to-be-fetched + // result from our child. + WorkingSetID id; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + WorkingSetMember* member = _ws->get(id); + + // If there's an obj there, there is no fetching to perform. + if (member->hasObj()) { + return returnIfMatches(member, id, out); + } + + // We need a valid loc to fetch from and this is the only state that has one. + verify(WorkingSetMember::LOC_AND_IDX == member->state); + verify(member->hasLoc()); + + Record* record = member->loc.rec(); + const char* data = record->dataNoThrowing(); + + if (!recordInMemory(data)) { + // member->loc points to a record that's NOT in memory. Pass a fetch request up. + verify(WorkingSet::INVALID_ID == _idBeingPagedIn); + _idBeingPagedIn = id; + *out = id; + return PlanStage::NEED_FETCH; + } + else { + // Don't need index data anymore as we have an obj. + member->keyData.clear(); + member->obj = BSONObj(data); + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + return returnIfMatches(member, id, out); + } + } + else { + // NEED_TIME/YIELD, ERROR, IS_EOF + return status; + } + } + + void FetchStage::prepareToYield() { _child->prepareToYield(); } + + void FetchStage::recoverFromYield() { _child->recoverFromYield(); } + + void FetchStage::invalidate(const DiskLoc& dl) { + _child->invalidate(dl); + + // If we're holding on to an object that we're waiting for the runner to page in... + if (WorkingSet::INVALID_ID != _idBeingPagedIn) { + WorkingSetMember* member = _ws->get(_idBeingPagedIn); + verify(member->hasLoc()); + // The DiskLoc is about to perish so we force a fetch of the data. + if (member->loc == dl) { + // This is a fetch inside of a write lock (that somebody else holds) but the other + // holder is likely operating on this object so this shouldn't have to hit disk. + WorkingSetCommon::fetchAndInvalidateLoc(member); + } + } + } + + PlanStage::StageState FetchStage::fetchCompleted(WorkingSetID* out) { + WorkingSetMember* member = _ws->get(_idBeingPagedIn); + + // The DiskLoc we're waiting to page in was invalidated (forced fetch). Test for + // matching and maybe pass it up. + if (member->state == WorkingSetMember::OWNED_OBJ) { + WorkingSetID memberID = _idBeingPagedIn; + _idBeingPagedIn = WorkingSet::INVALID_ID; + return returnIfMatches(member, memberID, out); + } + + // Assume that the caller has fetched appropriately. + // TODO: Do we want to double-check the runner? Not sure how reliable likelyInMemory is + // on all platforms. + verify(member->hasLoc()); + verify(!member->hasObj()); + + // Make the (unowned) object. + Record* record = member->loc.rec(); + const char* data = record->dataNoThrowing(); + member->obj = BSONObj(data); + + // Don't need index data anymore as we have an obj. + member->keyData.clear(); + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + verify(!member->obj.isOwned()); + + // Return the obj if it passes our filter. + WorkingSetID memberID = _idBeingPagedIn; + _idBeingPagedIn = WorkingSet::INVALID_ID; + return returnIfMatches(member, memberID, out); + } + + PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member, + WorkingSetID memberID, + WorkingSetID* out) { + if (NULL == _matcher || _matcher->matches(member)) { + *out = memberID; + return PlanStage::ADVANCED; + } + else { + _ws->free(memberID); + return PlanStage::NEED_TIME; + } + } + +} // namespace mongo diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h new file mode 100644 index 00000000000..67b6c026d43 --- /dev/null +++ b/src/mongo/db/exec/fetch.h @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/matcher.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { + + /** + * This stage turns a DiskLoc into a BSONObj. + * + * In WorkingSetMember terms, it transitions from LOC_AND_IDX to LOC_AND_UNOWNED_OBJ by reading + * the record at the provided loc. Returns verbatim any data that already has an object. + * + * Preconditions: Valid DiskLoc. + */ + class FetchStage : public PlanStage { + public: + FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher); + virtual ~FetchStage(); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + /** + * If the member (with id memberID) passes our filter, set *out to memberID and return that + * ADVANCED. Otherwise, free memberID and return NEED_TIME. + */ + StageState returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, + WorkingSetID* out); + + /** + * work(...) delegates to this when we're called after requesting a fetch. + */ + StageState fetchCompleted(WorkingSetID* out); + + // _ws is not owned by us. + WorkingSet* _ws; + scoped_ptr<PlanStage> _child; + scoped_ptr<Matcher> _matcher; + + // If we're fetching a DiskLoc and it points at something that's not in memory, we return a + // a "please page this in" result and hold on to the WSID until the next call to work(...). + WorkingSetID _idBeingPagedIn; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/limit.cpp b/src/mongo/db/exec/limit.cpp new file mode 100644 index 00000000000..9f394e6f508 --- /dev/null +++ b/src/mongo/db/exec/limit.cpp @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/limit.h" + +namespace mongo { + + LimitStage::LimitStage(int limit, WorkingSet* ws, PlanStage* child) + : _ws(ws), _child(child), _numToReturn(limit) { } + + LimitStage::~LimitStage() { } + + bool LimitStage::isEOF() { return (0 == _numToReturn) || _child->isEOF(); } + + PlanStage::StageState LimitStage::work(WorkingSetID* out) { + // If we've returned as many results as we're limited to, isEOF will be true. + if (isEOF()) { return PlanStage::IS_EOF; } + + WorkingSetID id; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + *out = id; + --_numToReturn; + return PlanStage::ADVANCED; + } + else { + // NEED_TIME/YIELD, ERROR, IS_EOF + return status; + } + } + + void LimitStage::prepareToYield() { _child->prepareToYield(); } + + void LimitStage::recoverFromYield() { _child->recoverFromYield(); } + + void LimitStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); } + +} // namespace mongo diff --git a/src/mongo/db/exec/limit.h b/src/mongo/db/exec/limit.h new file mode 100644 index 00000000000..eb674d8deed --- /dev/null +++ b/src/mongo/db/exec/limit.h @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { + + /** + * This stage implements limit functionality. It only returns 'limit' results before EOF. + * + * Sort has a baked-in limit, as it can optimize the sort if it has a limit. + * + * Preconditions: None. + */ + class LimitStage : public PlanStage { + public: + LimitStage(int limit, WorkingSet* ws, PlanStage* child); + virtual ~LimitStage(); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + WorkingSet* _ws; + scoped_ptr<PlanStage> _child; + + // We only return this many results. + int _numToReturn; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/mock_stage.cpp b/src/mongo/db/exec/mock_stage.cpp new file mode 100644 index 00000000000..128c93e1847 --- /dev/null +++ b/src/mongo/db/exec/mock_stage.cpp @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/mock_stage.h" + +namespace mongo { + + MockStage::MockStage(WorkingSet* ws) : _ws(ws) { } + + PlanStage::StageState MockStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + StageState state = _results.front(); + _results.pop(); + + if (PlanStage::ADVANCED == state) { + // We advanced. Put the mock obj into the working set. + WorkingSetID id = _ws->allocate(); + WorkingSetMember* member = _ws->get(id); + *member = _members.front(); + _members.pop(); + *out = id; + } + + return state; + } + + bool MockStage::isEOF() { return _results.empty(); } + + void MockStage::pushBack(const PlanStage::StageState state) { + _results.push(state); + } + + void MockStage::pushBack(const WorkingSetMember& member) { + _results.push(PlanStage::ADVANCED); + _members.push(member); + } + +} // namespace mongo diff --git a/src/mongo/db/exec/mock_stage.h b/src/mongo/db/exec/mock_stage.h new file mode 100644 index 00000000000..49b44fccf82 --- /dev/null +++ b/src/mongo/db/exec/mock_stage.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <queue> + +#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/exec/working_set.h" + +namespace mongo { + + class DiskLoc; + + /** + * MockStage is a data-producing stage that is used for testing. Unlike the other two leaf + * stages (CollectionScan and IndexScan) MockStage does not require any underlying storage + * layer. + * + * A MockStage is "programmed" by pushing return values from work() onto its internal queue. + * Calls to MockStage::work() pop values off that queue and return them in FIFO order, + * annotating the working set with data when appropriate. + */ + class MockStage : public PlanStage { + public: + MockStage(WorkingSet* ws); + virtual ~MockStage() { } + + virtual StageState work(WorkingSetID* out); + + virtual bool isEOF(); + + // These don't really mean anything here. + // Some day we could count the # of calls to the yield functions to check that other stages + // have correct yielding behavior. + virtual void prepareToYield() { } + virtual void recoverFromYield() { } + virtual void invalidate(const DiskLoc& dl) { } + + /** + * Add a result to the back of the queue. work() goes through the queue. + * Either no data is returned (just a state), or... + */ + void pushBack(const PlanStage::StageState state); + + /** + * ...data is returned (and we ADVANCED) + */ + void pushBack(const WorkingSetMember& member); + + private: + // We don't own this. + WorkingSet* _ws; + + // The data we return. + queue<PlanStage::StageState> _results; + queue<WorkingSetMember> _members; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/or.cpp b/src/mongo/db/exec/or.cpp new file mode 100644 index 00000000000..a0a678e2b95 --- /dev/null +++ b/src/mongo/db/exec/or.cpp @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/or.h" + +namespace mongo { + + OrStage::OrStage(WorkingSet* ws, bool dedup, Matcher* matcher) + : _ws(ws), _matcher(matcher), _currentChild(0), _dedup(dedup) { } + + OrStage::~OrStage() { + for (size_t i = 0; i < _children.size(); ++i) { + delete _children[i]; + } + } + + void OrStage::addChild(PlanStage* child) { _children.push_back(child); } + + bool OrStage::isEOF() { return _currentChild >= _children.size(); } + + PlanStage::StageState OrStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + WorkingSetID id; + StageState childStatus = _children[_currentChild]->work(&id); + + if (PlanStage::ADVANCED == childStatus) { + WorkingSetMember* member = _ws->get(id); + verify(member->hasLoc()); + + // If we're deduping... + if (_dedup) { + // ...and we've seen the DiskLoc before + if (_seen.end() != _seen.find(member->loc)) { + // ...drop it. + _ws->free(id); + return PlanStage::NEED_TIME; + } + else { + // Otherwise, note that we've seen it. + _seen.insert(member->loc); + } + } + + if (NULL == _matcher || _matcher->matches(member)) { + // Match! return it. + *out = id; + return PlanStage::ADVANCED; + } + else { + // Does not match, try again. + _ws->free(id); + return PlanStage::NEED_TIME; + } + } + else if (PlanStage::IS_EOF == childStatus) { + // Done with _currentChild, move to the next one. + ++_currentChild; + + // Maybe we're out of children. + if (isEOF()) { + return PlanStage::IS_EOF; + } + else { + return PlanStage::NEED_TIME; + } + } + else { + // NEED_TIME, ERROR, NEED_YIELD, pass them up. + return childStatus; + } + } + + void OrStage::prepareToYield() { + for (size_t i = 0; i < _children.size(); ++i) { + _children[i]->prepareToYield(); + } + } + + void OrStage::recoverFromYield() { + for (size_t i = 0; i < _children.size(); ++i) { + _children[i]->recoverFromYield(); + } + } + + void OrStage::invalidate(const DiskLoc& dl) { + if (isEOF()) { return; } + + for (size_t i = 0; i < _children.size(); ++i) { + _children[i]->invalidate(dl); + } + + // If we see DL again it is not the same record as it once was so we still want to + // return it. + if (_dedup) { _seen.erase(dl); } + } + +} // namespace mongo diff --git a/src/mongo/db/exec/or.h b/src/mongo/db/exec/or.h new file mode 100644 index 00000000000..8df29a0c00b --- /dev/null +++ b/src/mongo/db/exec/or.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/matcher.h" +#include "mongo/db/exec/plan_stage.h" +#include "mongo/platform/unordered_set.h" + +namespace mongo { + + /** + * This stage outputs the union of its children. It optionally deduplicates on DiskLoc. + * + * Preconditions: Valid DiskLoc. + * + * If we're deduping, we may fail to dedup any invalidated DiskLoc properly. + */ + class OrStage : public PlanStage { + public: + OrStage(WorkingSet* ws, bool dedup, Matcher* matcher); + virtual ~OrStage(); + + void addChild(PlanStage* child); + + virtual bool isEOF(); + + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + // Not owned by us. + WorkingSet* _ws; + + scoped_ptr<Matcher> _matcher; + + // Owned by us. + vector<PlanStage*> _children; + + // Which of _children are we calling work(...) on now? + size_t _currentChild; + + // True if we dedup on DiskLoc, false otherwise. + bool _dedup; + + // Which DiskLocs have we returned? + unordered_set<DiskLoc, DiskLoc::Hasher> _seen; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h index c1a232b3b2c..0532ef1cecf 100644 --- a/src/mongo/db/exec/plan_stage.h +++ b/src/mongo/db/exec/plan_stage.h @@ -94,16 +94,33 @@ namespace mongo { * All possible return values of work(...) */ enum StageState { - // work(...) has returned a new result in its out parameter. + // work(...) has returned a new result in its out parameter. The caller must free it + // from the working set when done with it. ADVANCED, - // work(...) won't do anything more. isEOF() will also be true. + + // work(...) won't do anything more. isEOF() will also be true. There is nothing + // output in the out parameter. IS_EOF, - // work(...) needs more time to product a result. Call work(...) again. + + // work(...) needs more time to product a result. Call work(...) again. There is + // nothing output in the out parameter. NEED_TIME, - // Something has gone unrecoverably wrong. Stop running this query. + + // Something has gone unrecoverably wrong. Stop running this query. There is nothing + // output in the out parameter. FAILURE, - // Something isn't in memory. Fetch it. TODO: actually support this (forthcoming). - NEED_YIELD, + + // Something isn't in memory. Fetch it. + // + // Full fetch semantics: + // The fetch-requesting stage populates the out parameter of work(...) with a WSID that + // refers to a WSM with a valid loc. Each stage that receives a NEED_FETCH from a child + // must propagate the NEED_FETCH up and perform no work. The plan runner is responsible + // for paging in the data upon receipt of a NEED_FETCH. The plan runner does NOT free + // the WSID of the requested fetch. The stage that requested the fetch holds the WSID + // of the loc it wants fetched. On the next call to work() that stage can assume a + // fetch was performed on the WSM that the held WSID refers to. + NEED_FETCH, }; /** diff --git a/src/mongo/db/exec/simple_plan_runner.h b/src/mongo/db/exec/simple_plan_runner.h index 34ca0b2f29a..797c3c9c9ec 100644 --- a/src/mongo/db/exec/simple_plan_runner.h +++ b/src/mongo/db/exec/simple_plan_runner.h @@ -17,6 +17,7 @@ #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/pdfile.h" namespace mongo { @@ -26,7 +27,6 @@ namespace mongo { * * TODO: Yielding policy * TODO: Graceful error handling - * TODO: Fetch * TODO: Stats, diagnostics, instrumentation, etc. */ class SimplePlanRunner { @@ -59,8 +59,31 @@ namespace mongo { else if (code == PlanStage::NEED_TIME) { // TODO: Occasionally yield. For now, we run until we get another result. } + else if (PlanStage::NEED_FETCH == code) { + // id has a loc and refers to an obj we need to fetch. + WorkingSetMember* member = _workingSet.get(id); + + // This must be true for somebody to request a fetch and can only change when an + // invalidation happens, which is when we give up a lock. Don't give up the + // lock between receiving the NEED_FETCH and actually fetching(?). + verify(member->hasLoc()); + + // Actually bring record into memory. + Record* record = member->loc.rec(); + record->touch(); + + // Record should be in memory now. Log if it's not. + if (!Record::likelyInPhysicalMemory(record->dataNoThrowing())) { + OCCASIONALLY { + warning() << "Record wasn't in memory immediately after fetch: " + << member->loc.toString() << endl; + } + } + + // Note that we're not freeing id. Fetch semantics say that we shouldn't. + } else { - // IS_EOF, FAILURE, NEED_YIELD. We just stop here. + // IS_EOF, FAILURE. We just stop here. return false; } } diff --git a/src/mongo/db/exec/skip.cpp b/src/mongo/db/exec/skip.cpp new file mode 100644 index 00000000000..8dfb4e33df3 --- /dev/null +++ b/src/mongo/db/exec/skip.cpp @@ -0,0 +1,58 @@ +/** +* Copyright (C) 2013 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "mongo/db/exec/skip.h" + +namespace mongo { + + SkipStage::SkipStage(int toSkip, WorkingSet* ws, PlanStage* child) + : _ws(ws), _child(child), _toSkip(toSkip) { } + + SkipStage::~SkipStage() { } + + bool SkipStage::isEOF() { return _child->isEOF(); } + + PlanStage::StageState SkipStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + WorkingSetID id; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + // If we're still skipping results... + if (_toSkip > 0) { + // ...drop the result. + --_toSkip; + _ws->free(id); + return PlanStage::NEED_TIME; + } + + *out = id; + return PlanStage::ADVANCED; + } + else { + // NEED_TIME/YIELD, ERROR, IS_EOF + return status; + } + } + + void SkipStage::prepareToYield() { _child->prepareToYield(); } + + void SkipStage::recoverFromYield() { _child->recoverFromYield(); } + + void SkipStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); } + +} // namespace mongo diff --git a/src/mongo/db/exec/skip.h b/src/mongo/db/exec/skip.h new file mode 100644 index 00000000000..5086e9d66aa --- /dev/null +++ b/src/mongo/db/exec/skip.h @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { + + /** + * This stage implements skip functionality. It drops the first 'toSkip' results from its child + * then returns the rest verbatim. + * + * Preconditions: None. + */ + class SkipStage : public PlanStage { + public: + SkipStage(int toSkip, WorkingSet* ws, PlanStage* child); + virtual ~SkipStage(); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + WorkingSet* _ws; + scoped_ptr<PlanStage> _child; + + // We drop the first _toSkip results that we would have returned. + int _toSkip; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp index e15e8a96489..d55a9dd497b 100644 --- a/src/mongo/db/exec/stagedebug_cmd.cpp +++ b/src/mongo/db/exec/stagedebug_cmd.cpp @@ -20,7 +20,11 @@ #include "mongo/db/commands.h" #include "mongo/db/exec/and_hash.h" #include "mongo/db/exec/and_sorted.h" +#include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_scan.h" +#include "mongo/db/exec/limit.h" +#include "mongo/db/exec/or.h" +#include "mongo/db/exec/skip.h" #include "mongo/db/exec/simple_plan_runner.h" #include "mongo/db/index/catalog_hack.h" #include "mongo/db/jsobj.h" @@ -49,16 +53,16 @@ namespace mongo { * * node -> {andHash: {filter: {filter}, args: { nodes: [node, node]}}} * node -> {andSorted: {filter: {filter}, args: { nodes: [node, node]}}} + * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}} + * node -> {fetch: {filter: {filter}, args: {node: node}}} + * node -> {limit: {args: {node: node, num: posint}}} + * node -> {skip: {args: {node: node, num: posint}}} * * Forthcoming Nodes: * * node -> {cscan: {filter: {filter}, args: {name: "collectionname" }}} - * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}} - * node -> {fetch: {filter: {filter}, args: {node: node}}} * node -> {sort: {filter: {filter}, args: {node: node, pattern: objWithSortCriterion}}} * node -> {dedup: {filter: {filter}, args: {node: node, field: field}}} - * node -> {limit: {filter: filter}, args: {node: node, num: posint}} - * node -> {skip: {filter: filter}, args: {node: node, num: posint}} * node -> {unwind: {filter: filter}, args: {node: node, field: field}} */ class StageDebugCmd : public Command { @@ -203,6 +207,52 @@ namespace mongo { return andStage.release(); } + else if ("or" == nodeName) { + uassert(16934, "Nodes argument must be provided to AND", + nodeArgs["nodes"].isABSONObj()); + uassert(16935, "Dedup argument must be provided to OR", + !nodeArgs["dedup"].eoo()); + BSONObjIterator it(nodeArgs["nodes"].Obj()); + auto_ptr<OrStage> orStage(new OrStage(workingSet, nodeArgs["dedup"].Bool(), + matcher.release())); + while (it.more()) { + BSONElement e = it.next(); + if (!e.isABSONObj()) { return NULL; } + PlanStage* subNode = parseQuery(dbname, e.Obj(), workingSet); + uassert(16936, "Can't parse sub-node of OR: " + e.Obj().toString(), + NULL != subNode); + // takes ownership + orStage->addChild(subNode); + } + + return orStage.release(); + } + else if ("fetch" == nodeName) { + uassert(16929, "Node argument must be provided to fetch", + nodeArgs["node"].isABSONObj()); + PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet); + return new FetchStage(workingSet, subNode, matcher.release()); + } + else if ("limit" == nodeName) { + uassert(16937, "Limit stage doesn't have a filter (put it on the child)", + NULL == matcher.get()); + uassert(16930, "Node argument must be provided to limit", + nodeArgs["node"].isABSONObj()); + uassert(16931, "Num argument must be provided to limit", + nodeArgs["num"].isNumber()); + PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet); + return new LimitStage(nodeArgs["num"].numberInt(), workingSet, subNode); + } + else if ("skip" == nodeName) { + uassert(16938, "Skip stage doesn't have a filter (put it on the child)", + NULL == matcher.get()); + uassert(16932, "Node argument must be provided to skip", + nodeArgs["node"].isABSONObj()); + uassert(16933, "Num argument must be provided to skip", + nodeArgs["num"].isNumber()); + PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet); + return new SkipStage(nodeArgs["num"].numberInt(), workingSet, subNode); + } else { return NULL; } diff --git a/src/mongo/db/matcher/matcher.cpp b/src/mongo/db/matcher/matcher.cpp index 8cb4a0b4286..be47ae91624 100644 --- a/src/mongo/db/matcher/matcher.cpp +++ b/src/mongo/db/matcher/matcher.cpp @@ -19,7 +19,6 @@ #include "mongo/pch.h" #include "mongo/base/init.h" -#include "mongo/db/index/index_descriptor.h" #include "mongo/db/jsobj.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/matcher/matcher.h" diff --git a/src/mongo/db/pdfile.h b/src/mongo/db/pdfile.h index b1ff2c118a0..68fd32d0f89 100644 --- a/src/mongo/db/pdfile.h +++ b/src/mongo/db/pdfile.h @@ -259,6 +259,9 @@ namespace mongo { const char * data() const { _accessing(); return _data; } char * data() { _accessing(); return _data; } + const char * dataNoThrowing() const { return _data; } + char * dataNoThrowing() { return _data; } + int netLength() const { _accessing(); return _netLength(); } /* use this when a record is deleted. basically a union with next/prev fields */ diff --git a/src/mongo/dbtests/query_stage_fetch.cpp b/src/mongo/dbtests/query_stage_fetch.cpp new file mode 100644 index 00000000000..ab34f2823fb --- /dev/null +++ b/src/mongo/dbtests/query_stage_fetch.cpp @@ -0,0 +1,389 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** + * This file tests db/exec/fetch.cpp. Fetch goes to disk so we cannot test outside of a dbtest. + */ + +#include "mongo/client/dbclientcursor.h" +#include "mongo/db/cursor.h" +#include "mongo/db/exec/fetch.h" +#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/exec/mock_stage.h" +#include "mongo/db/instance.h" +#include "mongo/db/json.h" +#include "mongo/db/matcher.h" +#include "mongo/db/pdfile.h" +#include "mongo/dbtests/dbtests.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/fail_point_registry.h" +#include "mongo/util/fail_point_service.h" + +namespace QueryStageFetch { + + class QueryStageFetchBase { + public: + QueryStageFetchBase() { } + + virtual ~QueryStageFetchBase() { + _client.dropCollection(ns()); + } + + void getLocs(set<DiskLoc>* out) { + for (shared_ptr<Cursor> c = theDataFileMgr.findAll(ns()); c->ok(); c->advance()) { + out->insert(c->currLoc()); + } + } + + void insert(const BSONObj& obj) { + _client.insert(ns(), obj); + } + + void remove(const BSONObj& obj) { + _client.remove(ns(), obj); + } + + static const char* ns() { return "unittests.QueryStageFetch"; } + + private: + static DBDirectClient _client; + }; + + DBDirectClient QueryStageFetchBase::_client; + + // + // Test that a fetch is passed up when it's not in memory. + // + class FetchStageNotInMemory : public QueryStageFetchBase { + public: + void run() { + Client::WriteContext ctx(ns()); + WorkingSet ws; + + // Add an object to the DB. + insert(BSON("foo" << 5)); + set<DiskLoc> locs; + getLocs(&locs); + ASSERT_EQUALS(size_t(1), locs.size()); + + // Create a mock stage that returns the WSM. + auto_ptr<MockStage> mockStage(new MockStage(&ws)); + + // Mock data. + { + WorkingSetMember mockMember; + mockMember.state = WorkingSetMember::LOC_AND_IDX; + mockMember.loc = *locs.begin(); + + // State is loc and index, shouldn't be able to get the foo data inside. + BSONElement elt; + ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt)); + mockStage->pushBack(mockMember); + } + + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + + // Set the fail point to return not in memory. + FailPointRegistry* reg = getGlobalFailPointRegistry(); + FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail"); + fetchInMemoryFail->setMode(FailPoint::alwaysOn); + + // First call should return a fetch request as it's not in memory. + WorkingSetID id; + PlanStage::StageState state; + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::NEED_FETCH, state); + + // Let's do the fetch ourselves (though it doesn't really matter) + WorkingSetMember* member = ws.get(id); + ASSERT_FALSE(member->hasObj()); + member->loc.rec()->touch(); + + // Next call to work() should give us the object in a diff. state + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::ADVANCED, state); + ASSERT_EQUALS(WorkingSetMember::LOC_AND_UNOWNED_OBJ, member->state); + + // We should be able to get data from the obj now. + BSONElement elt; + ASSERT_TRUE(member->getFieldDotted("foo", &elt)); + ASSERT_EQUALS(elt.numberInt(), 5); + + // Mock stage is EOF so fetch should be too. + ASSERT_TRUE(fetchStage->isEOF()); + + // Turn off fail point for further tests. + fetchInMemoryFail->setMode(FailPoint::off); + } + }; + + // + // Test that a fetch is not passed up when it's in memory. + // + class FetchStageInMemory : public QueryStageFetchBase { + public: + void run() { + Client::WriteContext ctx(ns()); + WorkingSet ws; + + // Add an object to the DB. + insert(BSON("foo" << 5)); + set<DiskLoc> locs; + getLocs(&locs); + ASSERT_EQUALS(size_t(1), locs.size()); + + // Create a mock stage that returns the WSM. + auto_ptr<MockStage> mockStage(new MockStage(&ws)); + + // Mock data. + { + WorkingSetMember mockMember; + mockMember.state = WorkingSetMember::LOC_AND_IDX; + mockMember.loc = *locs.begin(); + + // State is loc and index, shouldn't be able to get the foo data inside. + BSONElement elt; + ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt)); + mockStage->pushBack(mockMember); + } + + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + + // Set the fail point to return in memory. + FailPointRegistry* reg = getGlobalFailPointRegistry(); + FailPoint* fetchInMemorySucceed = reg->getFailPoint("fetchInMemorySucceed"); + fetchInMemorySucceed->setMode(FailPoint::alwaysOn); + + // First call fetches as expected. + WorkingSetID id; + PlanStage::StageState state; + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::ADVANCED, state); + + // State should have changed. + WorkingSetMember* member = ws.get(id); + ASSERT_EQUALS(WorkingSetMember::LOC_AND_UNOWNED_OBJ, member->state); + + // We should be able to get data from the obj now. + BSONElement elt; + ASSERT_TRUE(member->getFieldDotted("foo", &elt)); + ASSERT_EQUALS(elt.numberInt(), 5); + + // Mock stage is EOF so fetch should be too. + ASSERT_TRUE(fetchStage->isEOF()); + + // Turn off fail point for further tests. + fetchInMemorySucceed->setMode(FailPoint::off); + } + }; + + // + // Test mid-fetch invalidation. + // + class FetchStageInvalidation : public QueryStageFetchBase { + public: + void run() { + Client::WriteContext ctx(ns()); + WorkingSet ws; + + // Add an object to the DB. + insert(BSON("foo" << 5)); + set<DiskLoc> locs; + getLocs(&locs); + ASSERT_EQUALS(size_t(1), locs.size()); + + // Create a mock stage that returns the WSM. + auto_ptr<MockStage> mockStage(new MockStage(&ws)); + + // Mock data. + { + WorkingSetMember mockMember; + mockMember.state = WorkingSetMember::LOC_AND_IDX; + mockMember.loc = *locs.begin(); + + // State is loc and index, shouldn't be able to get the foo data inside. + BSONElement elt; + ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt)); + mockStage->pushBack(mockMember); + } + + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + + // Set the fail point to return not in memory. + FailPointRegistry* reg = getGlobalFailPointRegistry(); + FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail"); + fetchInMemoryFail->setMode(FailPoint::alwaysOn); + + // First call should return a fetch request as it's not in memory. + WorkingSetID id; + PlanStage::StageState state; + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::NEED_FETCH, state); + + WorkingSetMember* member = ws.get(id); + + // Invalidate the DL. + fetchStage->invalidate(member->loc); + + // Next call to work() should give us the OWNED obj as it was invalidated mid-page-in. + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::ADVANCED, state); + ASSERT_EQUALS(WorkingSetMember::OWNED_OBJ, member->state); + + // We should be able to get data from the obj now. + BSONElement elt; + ASSERT_TRUE(member->getFieldDotted("foo", &elt)); + ASSERT_EQUALS(elt.numberInt(), 5); + + // Mock stage is EOF so fetch should be too. + ASSERT_TRUE(fetchStage->isEOF()); + + // Turn off fail point for further tests. + fetchInMemoryFail->setMode(FailPoint::off); + } + }; + + + // + // Test that a WSM with an obj is passed through verbatim. + // + class FetchStageAlreadyFetched : public QueryStageFetchBase { + public: + void run() { + Client::WriteContext ctx(ns()); + WorkingSet ws; + + // Add an object to the DB. + insert(BSON("foo" << 5)); + set<DiskLoc> locs; + getLocs(&locs); + ASSERT_EQUALS(size_t(1), locs.size()); + + // Create a mock stage that returns the WSM. + auto_ptr<MockStage> mockStage(new MockStage(&ws)); + + // Mock data. + { + WorkingSetMember mockMember; + mockMember.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + mockMember.loc = *locs.begin(); + mockMember.obj = mockMember.loc.obj(); + // Points into our DB. + ASSERT_FALSE(mockMember.obj.isOwned()); + mockStage->pushBack(mockMember); + + mockMember.state = WorkingSetMember::OWNED_OBJ; + mockMember.loc = DiskLoc(); + mockMember.obj = BSON("foo" << 6); + ASSERT_TRUE(mockMember.obj.isOwned()); + mockStage->pushBack(mockMember); + } + + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + + // Set the fail point to return not in memory so we get a fetch request. + FailPointRegistry* reg = getGlobalFailPointRegistry(); + FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail"); + fetchInMemoryFail->setMode(FailPoint::alwaysOn); + + WorkingSetID id; + PlanStage::StageState state; + + // Don't bother doing any fetching if an obj exists already. + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::ADVANCED, state); + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::ADVANCED, state); + + // No more data to fetch, so, EOF. + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::IS_EOF, state); + + fetchInMemoryFail->setMode(FailPoint::off); + } + }; + + // + // Test matching with fetch. + // + class FetchStageFilter : public QueryStageFetchBase { + public: + void run() { + Client::WriteContext ctx(ns()); + WorkingSet ws; + + // Add an object to the DB. + insert(BSON("foo" << 5)); + set<DiskLoc> locs; + getLocs(&locs); + ASSERT_EQUALS(size_t(1), locs.size()); + + // Create a mock stage that returns the WSM. + auto_ptr<MockStage> mockStage(new MockStage(&ws)); + + // Mock data. + { + WorkingSetMember mockMember; + mockMember.state = WorkingSetMember::LOC_AND_IDX; + mockMember.loc = *locs.begin(); + + // State is loc and index, shouldn't be able to get the foo data inside. + BSONElement elt; + ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt)); + mockStage->pushBack(mockMember); + } + + // Matcher requires that foo==6 but we only have data with foo==5. + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), + new Matcher(BSON("foo" << 6)))); + + // Set the fail point to return not in memory so we get a fetch request. + FailPointRegistry* reg = getGlobalFailPointRegistry(); + FailPoint* fetchInMemoryFail = reg->getFailPoint("fetchInMemoryFail"); + fetchInMemoryFail->setMode(FailPoint::alwaysOn); + + // First call should return a fetch request as it's not in memory. + WorkingSetID id; + PlanStage::StageState state; + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::NEED_FETCH, state); + + // Normally we'd return the object but we have a filter that prevents it. + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::NEED_TIME, state); + + // No more data to fetch, so, EOF. + state = fetchStage->work(&id); + ASSERT_EQUALS(PlanStage::IS_EOF, state); + + fetchInMemoryFail->setMode(FailPoint::off); + } + }; + + class All : public Suite { + public: + All() : Suite( "query_stage_fetch" ) { } + + void setupTests() { + add<FetchStageNotInMemory>(); + add<FetchStageInMemory>(); + add<FetchStageAlreadyFetched>(); + add<FetchStageInvalidation>(); + add<FetchStageFilter>(); + } + } queryStageFetchAll; + +} // namespace QueryStageFetch diff --git a/src/mongo/dbtests/query_stage_limit_skip.cpp b/src/mongo/dbtests/query_stage_limit_skip.cpp new file mode 100644 index 00000000000..c752b589889 --- /dev/null +++ b/src/mongo/dbtests/query_stage_limit_skip.cpp @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** + * This file tests db/exec/limit.cpp and db/exec/skip.cpp. + */ + +#include "mongo/client/dbclientcursor.h" +#include "mongo/db/exec/limit.h" +#include "mongo/db/exec/mock_stage.h" +#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/exec/skip.h" +#include "mongo/db/instance.h" +#include "mongo/db/json.h" +#include "mongo/dbtests/dbtests.h" + +using namespace mongo; + +namespace { + + static const int N = 50; + + /* Populate a MockStage and return it. Caller owns it. */ + MockStage* getMS(WorkingSet* ws) { + auto_ptr<MockStage> ms(new MockStage(ws)); + + // Put N ADVANCED results into the mock stage, and some other stalling results (YIELD/TIME). + for (int i = 0; i < N; ++i) { + ms->pushBack(PlanStage::NEED_TIME); + WorkingSetMember wsm; + wsm.state = WorkingSetMember::OWNED_OBJ; + wsm.obj = BSON("x" << i); + ms->pushBack(wsm); + ms->pushBack(PlanStage::NEED_TIME); + ms->pushBack(PlanStage::NEED_FETCH); + } + + return ms.release(); + } + + int countResults(PlanStage* stage) { + int count = 0; + while (!stage->isEOF()) { + WorkingSetID id; + PlanStage::StageState status = stage->work(&id); + if (PlanStage::ADVANCED != status) { continue; } + ++count; + } + return count; + } + + // + // Insert 50 objects. Filter/skip 0, 1, 2, ..., 100 objects and expect the right # of results. + // + class LimitSkipBasicTest { + public: + void run() { + for (int i = 0; i < 2 * N; ++i) { + WorkingSet ws; + + scoped_ptr<PlanStage> skip(new SkipStage(i, &ws, getMS(&ws))); + ASSERT_EQUALS(max(0, N - i), countResults(skip.get())); + + scoped_ptr<PlanStage> limit(new LimitStage(i, &ws, getMS(&ws))); + ASSERT_EQUALS(min(N, i), countResults(limit.get())); + } + } + }; + + class All : public Suite { + public: + All() : Suite( "query_stage_limit_skip" ) { } + + void setupTests() { + add<LimitSkipBasicTest>(); + } + } queryStageLimitSkipAll; + +} // namespace |