diff options
author | Hari Khalsa <hkhalsa@10gen.com> | 2013-07-08 14:44:32 -0400 |
---|---|---|
committer | Hari Khalsa <hkhalsa@10gen.com> | 2013-07-09 14:10:54 -0400 |
commit | b8f0ec598013009c56dee527e76429ffa7b8c394 (patch) | |
tree | a7cf31d7fbf6bc525f4b1ded421dad8a97aed9a5 /src/mongo/db/exec | |
parent | fedd312c424fc5f82f7be1dad13f3dd74403c4a4 (diff) | |
download | mongo-b8f0ec598013009c56dee527e76429ffa7b8c394.tar.gz |
SERVER-10026 fetch limit skip or
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/SConscript | 51 | ||||
-rw-r--r-- | src/mongo/db/exec/fetch.cpp | 171 | ||||
-rw-r--r-- | src/mongo/db/exec/fetch.h | 69 | ||||
-rw-r--r-- | src/mongo/db/exec/limit.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/exec/limit.h | 52 | ||||
-rw-r--r-- | src/mongo/db/exec/mock_stage.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/exec/mock_stage.h | 73 | ||||
-rw-r--r-- | src/mongo/db/exec/or.cpp | 111 | ||||
-rw-r--r-- | src/mongo/db/exec/or.h | 68 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stage.h | 29 | ||||
-rw-r--r-- | src/mongo/db/exec/simple_plan_runner.h | 27 | ||||
-rw-r--r-- | src/mongo/db/exec/skip.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/exec/skip.h | 51 | ||||
-rw-r--r-- | src/mongo/db/exec/stagedebug_cmd.cpp | 58 |
14 files changed, 910 insertions, 12 deletions
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript new file mode 100644 index 00000000000..c0606c4ab1d --- /dev/null +++ b/src/mongo/db/exec/SConscript @@ -0,0 +1,51 @@ +# -*- mode: python -*- + +Import("env") + +env.StaticLibrary( + target = "working_set", + source = [ + "working_set.cpp", + ], + LIBDEPS = [ + "$BUILD_DIR/mongo/bson", + ], +) + +env.CppUnitTest( + target = "working_set_test", + source = [ + "working_set_test.cpp" + ], + LIBDEPS = [ + "working_set", + ], +) + +env.StaticLibrary( + target = "mock_stage", + source = [ + "mock_stage.cpp", + ], + LIBDEPS = [ + "working_set", + ], +) + +env.StaticLibrary( + target = 'exec', + source = [ + "and_hash.cpp", + "and_sorted.cpp", + "fetch.cpp", + "index_scan.cpp", + "limit.cpp", + "or.cpp", + "skip.cpp", + "stagedebug_cmd.cpp", + "working_set_common.cpp", + ], + LIBDEPS = [ + "$BUILD_DIR/mongo/bson" + ], +) diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp new file mode 100644 index 00000000000..d6f81c04693 --- /dev/null +++ b/src/mongo/db/exec/fetch.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/fetch.h" + +#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/pdfile.h" +#include "mongo/util/fail_point_service.h" + +namespace mongo { + + // Some fail points for testing. + MONGO_FP_DECLARE(fetchInMemoryFail); + MONGO_FP_DECLARE(fetchInMemorySucceed); + + FetchStage::FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher) + : _ws(ws), _child(child), _matcher(matcher), _idBeingPagedIn(WorkingSet::INVALID_ID) { } + + FetchStage::~FetchStage() { } + + bool FetchStage::isEOF() { + if (WorkingSet::INVALID_ID != _idBeingPagedIn) { + // We asked our parent for a page-in but he didn't get back to us. We still need to + // return the result that _idBeingPagedIn refers to. + return false; + } + + return _child->isEOF(); + } + + bool recordInMemory(const char* data) { + if (MONGO_FAIL_POINT(fetchInMemoryFail)) { + return false; + } + + if (MONGO_FAIL_POINT(fetchInMemorySucceed)) { + return true; + } + + return Record::likelyInPhysicalMemory(data); + } + + PlanStage::StageState FetchStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + // If we asked our parent for a page-in last time work(...) was called, finish the fetch. + if (WorkingSet::INVALID_ID != _idBeingPagedIn) { + return fetchCompleted(out); + } + + // If we're here, we're not waiting for a DiskLoc to be fetched. Get another to-be-fetched + // result from our child. + WorkingSetID id; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + WorkingSetMember* member = _ws->get(id); + + // If there's an obj there, there is no fetching to perform. + if (member->hasObj()) { + return returnIfMatches(member, id, out); + } + + // We need a valid loc to fetch from and this is the only state that has one. + verify(WorkingSetMember::LOC_AND_IDX == member->state); + verify(member->hasLoc()); + + Record* record = member->loc.rec(); + const char* data = record->dataNoThrowing(); + + if (!recordInMemory(data)) { + // member->loc points to a record that's NOT in memory. Pass a fetch request up. + verify(WorkingSet::INVALID_ID == _idBeingPagedIn); + _idBeingPagedIn = id; + *out = id; + return PlanStage::NEED_FETCH; + } + else { + // Don't need index data anymore as we have an obj. + member->keyData.clear(); + member->obj = BSONObj(data); + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + return returnIfMatches(member, id, out); + } + } + else { + // NEED_TIME/YIELD, ERROR, IS_EOF + return status; + } + } + + void FetchStage::prepareToYield() { _child->prepareToYield(); } + + void FetchStage::recoverFromYield() { _child->recoverFromYield(); } + + void FetchStage::invalidate(const DiskLoc& dl) { + _child->invalidate(dl); + + // If we're holding on to an object that we're waiting for the runner to page in... + if (WorkingSet::INVALID_ID != _idBeingPagedIn) { + WorkingSetMember* member = _ws->get(_idBeingPagedIn); + verify(member->hasLoc()); + // The DiskLoc is about to perish so we force a fetch of the data. + if (member->loc == dl) { + // This is a fetch inside of a write lock (that somebody else holds) but the other + // holder is likely operating on this object so this shouldn't have to hit disk. + WorkingSetCommon::fetchAndInvalidateLoc(member); + } + } + } + + PlanStage::StageState FetchStage::fetchCompleted(WorkingSetID* out) { + WorkingSetMember* member = _ws->get(_idBeingPagedIn); + + // The DiskLoc we're waiting to page in was invalidated (forced fetch). Test for + // matching and maybe pass it up. + if (member->state == WorkingSetMember::OWNED_OBJ) { + WorkingSetID memberID = _idBeingPagedIn; + _idBeingPagedIn = WorkingSet::INVALID_ID; + return returnIfMatches(member, memberID, out); + } + + // Assume that the caller has fetched appropriately. + // TODO: Do we want to double-check the runner? Not sure how reliable likelyInMemory is + // on all platforms. + verify(member->hasLoc()); + verify(!member->hasObj()); + + // Make the (unowned) object. + Record* record = member->loc.rec(); + const char* data = record->dataNoThrowing(); + member->obj = BSONObj(data); + + // Don't need index data anymore as we have an obj. + member->keyData.clear(); + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + verify(!member->obj.isOwned()); + + // Return the obj if it passes our filter. + WorkingSetID memberID = _idBeingPagedIn; + _idBeingPagedIn = WorkingSet::INVALID_ID; + return returnIfMatches(member, memberID, out); + } + + PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member, + WorkingSetID memberID, + WorkingSetID* out) { + if (NULL == _matcher || _matcher->matches(member)) { + *out = memberID; + return PlanStage::ADVANCED; + } + else { + _ws->free(memberID); + return PlanStage::NEED_TIME; + } + } + +} // namespace mongo diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h new file mode 100644 index 00000000000..67b6c026d43 --- /dev/null +++ b/src/mongo/db/exec/fetch.h @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/matcher.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { + + /** + * This stage turns a DiskLoc into a BSONObj. + * + * In WorkingSetMember terms, it transitions from LOC_AND_IDX to LOC_AND_UNOWNED_OBJ by reading + * the record at the provided loc. Returns verbatim any data that already has an object. + * + * Preconditions: Valid DiskLoc. + */ + class FetchStage : public PlanStage { + public: + FetchStage(WorkingSet* ws, PlanStage* child, Matcher* matcher); + virtual ~FetchStage(); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + /** + * If the member (with id memberID) passes our filter, set *out to memberID and return that + * ADVANCED. Otherwise, free memberID and return NEED_TIME. + */ + StageState returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, + WorkingSetID* out); + + /** + * work(...) delegates to this when we're called after requesting a fetch. + */ + StageState fetchCompleted(WorkingSetID* out); + + // _ws is not owned by us. + WorkingSet* _ws; + scoped_ptr<PlanStage> _child; + scoped_ptr<Matcher> _matcher; + + // If we're fetching a DiskLoc and it points at something that's not in memory, we return a + // a "please page this in" result and hold on to the WSID until the next call to work(...). + WorkingSetID _idBeingPagedIn; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/limit.cpp b/src/mongo/db/exec/limit.cpp new file mode 100644 index 00000000000..9f394e6f508 --- /dev/null +++ b/src/mongo/db/exec/limit.cpp @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/limit.h" + +namespace mongo { + + LimitStage::LimitStage(int limit, WorkingSet* ws, PlanStage* child) + : _ws(ws), _child(child), _numToReturn(limit) { } + + LimitStage::~LimitStage() { } + + bool LimitStage::isEOF() { return (0 == _numToReturn) || _child->isEOF(); } + + PlanStage::StageState LimitStage::work(WorkingSetID* out) { + // If we've returned as many results as we're limited to, isEOF will be true. + if (isEOF()) { return PlanStage::IS_EOF; } + + WorkingSetID id; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + *out = id; + --_numToReturn; + return PlanStage::ADVANCED; + } + else { + // NEED_TIME/YIELD, ERROR, IS_EOF + return status; + } + } + + void LimitStage::prepareToYield() { _child->prepareToYield(); } + + void LimitStage::recoverFromYield() { _child->recoverFromYield(); } + + void LimitStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); } + +} // namespace mongo diff --git a/src/mongo/db/exec/limit.h b/src/mongo/db/exec/limit.h new file mode 100644 index 00000000000..eb674d8deed --- /dev/null +++ b/src/mongo/db/exec/limit.h @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { + + /** + * This stage implements limit functionality. It only returns 'limit' results before EOF. + * + * Sort has a baked-in limit, as it can optimize the sort if it has a limit. + * + * Preconditions: None. + */ + class LimitStage : public PlanStage { + public: + LimitStage(int limit, WorkingSet* ws, PlanStage* child); + virtual ~LimitStage(); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + WorkingSet* _ws; + scoped_ptr<PlanStage> _child; + + // We only return this many results. + int _numToReturn; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/mock_stage.cpp b/src/mongo/db/exec/mock_stage.cpp new file mode 100644 index 00000000000..128c93e1847 --- /dev/null +++ b/src/mongo/db/exec/mock_stage.cpp @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/mock_stage.h" + +namespace mongo { + + MockStage::MockStage(WorkingSet* ws) : _ws(ws) { } + + PlanStage::StageState MockStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + StageState state = _results.front(); + _results.pop(); + + if (PlanStage::ADVANCED == state) { + // We advanced. Put the mock obj into the working set. + WorkingSetID id = _ws->allocate(); + WorkingSetMember* member = _ws->get(id); + *member = _members.front(); + _members.pop(); + *out = id; + } + + return state; + } + + bool MockStage::isEOF() { return _results.empty(); } + + void MockStage::pushBack(const PlanStage::StageState state) { + _results.push(state); + } + + void MockStage::pushBack(const WorkingSetMember& member) { + _results.push(PlanStage::ADVANCED); + _members.push(member); + } + +} // namespace mongo diff --git a/src/mongo/db/exec/mock_stage.h b/src/mongo/db/exec/mock_stage.h new file mode 100644 index 00000000000..49b44fccf82 --- /dev/null +++ b/src/mongo/db/exec/mock_stage.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <queue> + +#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/exec/working_set.h" + +namespace mongo { + + class DiskLoc; + + /** + * MockStage is a data-producing stage that is used for testing. Unlike the other two leaf + * stages (CollectionScan and IndexScan) MockStage does not require any underlying storage + * layer. + * + * A MockStage is "programmed" by pushing return values from work() onto its internal queue. + * Calls to MockStage::work() pop values off that queue and return them in FIFO order, + * annotating the working set with data when appropriate. + */ + class MockStage : public PlanStage { + public: + MockStage(WorkingSet* ws); + virtual ~MockStage() { } + + virtual StageState work(WorkingSetID* out); + + virtual bool isEOF(); + + // These don't really mean anything here. + // Some day we could count the # of calls to the yield functions to check that other stages + // have correct yielding behavior. + virtual void prepareToYield() { } + virtual void recoverFromYield() { } + virtual void invalidate(const DiskLoc& dl) { } + + /** + * Add a result to the back of the queue. work() goes through the queue. + * Either no data is returned (just a state), or... + */ + void pushBack(const PlanStage::StageState state); + + /** + * ...data is returned (and we ADVANCED) + */ + void pushBack(const WorkingSetMember& member); + + private: + // We don't own this. + WorkingSet* _ws; + + // The data we return. + queue<PlanStage::StageState> _results; + queue<WorkingSetMember> _members; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/or.cpp b/src/mongo/db/exec/or.cpp new file mode 100644 index 00000000000..a0a678e2b95 --- /dev/null +++ b/src/mongo/db/exec/or.cpp @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/db/exec/or.h" + +namespace mongo { + + OrStage::OrStage(WorkingSet* ws, bool dedup, Matcher* matcher) + : _ws(ws), _matcher(matcher), _currentChild(0), _dedup(dedup) { } + + OrStage::~OrStage() { + for (size_t i = 0; i < _children.size(); ++i) { + delete _children[i]; + } + } + + void OrStage::addChild(PlanStage* child) { _children.push_back(child); } + + bool OrStage::isEOF() { return _currentChild >= _children.size(); } + + PlanStage::StageState OrStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + WorkingSetID id; + StageState childStatus = _children[_currentChild]->work(&id); + + if (PlanStage::ADVANCED == childStatus) { + WorkingSetMember* member = _ws->get(id); + verify(member->hasLoc()); + + // If we're deduping... + if (_dedup) { + // ...and we've seen the DiskLoc before + if (_seen.end() != _seen.find(member->loc)) { + // ...drop it. + _ws->free(id); + return PlanStage::NEED_TIME; + } + else { + // Otherwise, note that we've seen it. + _seen.insert(member->loc); + } + } + + if (NULL == _matcher || _matcher->matches(member)) { + // Match! return it. + *out = id; + return PlanStage::ADVANCED; + } + else { + // Does not match, try again. + _ws->free(id); + return PlanStage::NEED_TIME; + } + } + else if (PlanStage::IS_EOF == childStatus) { + // Done with _currentChild, move to the next one. + ++_currentChild; + + // Maybe we're out of children. + if (isEOF()) { + return PlanStage::IS_EOF; + } + else { + return PlanStage::NEED_TIME; + } + } + else { + // NEED_TIME, ERROR, NEED_YIELD, pass them up. + return childStatus; + } + } + + void OrStage::prepareToYield() { + for (size_t i = 0; i < _children.size(); ++i) { + _children[i]->prepareToYield(); + } + } + + void OrStage::recoverFromYield() { + for (size_t i = 0; i < _children.size(); ++i) { + _children[i]->recoverFromYield(); + } + } + + void OrStage::invalidate(const DiskLoc& dl) { + if (isEOF()) { return; } + + for (size_t i = 0; i < _children.size(); ++i) { + _children[i]->invalidate(dl); + } + + // If we see DL again it is not the same record as it once was so we still want to + // return it. + if (_dedup) { _seen.erase(dl); } + } + +} // namespace mongo diff --git a/src/mongo/db/exec/or.h b/src/mongo/db/exec/or.h new file mode 100644 index 00000000000..8df29a0c00b --- /dev/null +++ b/src/mongo/db/exec/or.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/matcher.h" +#include "mongo/db/exec/plan_stage.h" +#include "mongo/platform/unordered_set.h" + +namespace mongo { + + /** + * This stage outputs the union of its children. It optionally deduplicates on DiskLoc. + * + * Preconditions: Valid DiskLoc. + * + * If we're deduping, we may fail to dedup any invalidated DiskLoc properly. + */ + class OrStage : public PlanStage { + public: + OrStage(WorkingSet* ws, bool dedup, Matcher* matcher); + virtual ~OrStage(); + + void addChild(PlanStage* child); + + virtual bool isEOF(); + + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + // Not owned by us. + WorkingSet* _ws; + + scoped_ptr<Matcher> _matcher; + + // Owned by us. + vector<PlanStage*> _children; + + // Which of _children are we calling work(...) on now? + size_t _currentChild; + + // True if we dedup on DiskLoc, false otherwise. + bool _dedup; + + // Which DiskLocs have we returned? + unordered_set<DiskLoc, DiskLoc::Hasher> _seen; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h index c1a232b3b2c..0532ef1cecf 100644 --- a/src/mongo/db/exec/plan_stage.h +++ b/src/mongo/db/exec/plan_stage.h @@ -94,16 +94,33 @@ namespace mongo { * All possible return values of work(...) */ enum StageState { - // work(...) has returned a new result in its out parameter. + // work(...) has returned a new result in its out parameter. The caller must free it + // from the working set when done with it. ADVANCED, - // work(...) won't do anything more. isEOF() will also be true. + + // work(...) won't do anything more. isEOF() will also be true. There is nothing + // output in the out parameter. IS_EOF, - // work(...) needs more time to product a result. Call work(...) again. + + // work(...) needs more time to product a result. Call work(...) again. There is + // nothing output in the out parameter. NEED_TIME, - // Something has gone unrecoverably wrong. Stop running this query. + + // Something has gone unrecoverably wrong. Stop running this query. There is nothing + // output in the out parameter. FAILURE, - // Something isn't in memory. Fetch it. TODO: actually support this (forthcoming). - NEED_YIELD, + + // Something isn't in memory. Fetch it. + // + // Full fetch semantics: + // The fetch-requesting stage populates the out parameter of work(...) with a WSID that + // refers to a WSM with a valid loc. Each stage that receives a NEED_FETCH from a child + // must propagate the NEED_FETCH up and perform no work. The plan runner is responsible + // for paging in the data upon receipt of a NEED_FETCH. The plan runner does NOT free + // the WSID of the requested fetch. The stage that requested the fetch holds the WSID + // of the loc it wants fetched. On the next call to work() that stage can assume a + // fetch was performed on the WSM that the held WSID refers to. + NEED_FETCH, }; /** diff --git a/src/mongo/db/exec/simple_plan_runner.h b/src/mongo/db/exec/simple_plan_runner.h index 34ca0b2f29a..797c3c9c9ec 100644 --- a/src/mongo/db/exec/simple_plan_runner.h +++ b/src/mongo/db/exec/simple_plan_runner.h @@ -17,6 +17,7 @@ #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/pdfile.h" namespace mongo { @@ -26,7 +27,6 @@ namespace mongo { * * TODO: Yielding policy * TODO: Graceful error handling - * TODO: Fetch * TODO: Stats, diagnostics, instrumentation, etc. */ class SimplePlanRunner { @@ -59,8 +59,31 @@ namespace mongo { else if (code == PlanStage::NEED_TIME) { // TODO: Occasionally yield. For now, we run until we get another result. } + else if (PlanStage::NEED_FETCH == code) { + // id has a loc and refers to an obj we need to fetch. + WorkingSetMember* member = _workingSet.get(id); + + // This must be true for somebody to request a fetch and can only change when an + // invalidation happens, which is when we give up a lock. Don't give up the + // lock between receiving the NEED_FETCH and actually fetching(?). + verify(member->hasLoc()); + + // Actually bring record into memory. + Record* record = member->loc.rec(); + record->touch(); + + // Record should be in memory now. Log if it's not. + if (!Record::likelyInPhysicalMemory(record->dataNoThrowing())) { + OCCASIONALLY { + warning() << "Record wasn't in memory immediately after fetch: " + << member->loc.toString() << endl; + } + } + + // Note that we're not freeing id. Fetch semantics say that we shouldn't. + } else { - // IS_EOF, FAILURE, NEED_YIELD. We just stop here. + // IS_EOF, FAILURE. We just stop here. return false; } } diff --git a/src/mongo/db/exec/skip.cpp b/src/mongo/db/exec/skip.cpp new file mode 100644 index 00000000000..8dfb4e33df3 --- /dev/null +++ b/src/mongo/db/exec/skip.cpp @@ -0,0 +1,58 @@ +/** +* Copyright (C) 2013 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "mongo/db/exec/skip.h" + +namespace mongo { + + SkipStage::SkipStage(int toSkip, WorkingSet* ws, PlanStage* child) + : _ws(ws), _child(child), _toSkip(toSkip) { } + + SkipStage::~SkipStage() { } + + bool SkipStage::isEOF() { return _child->isEOF(); } + + PlanStage::StageState SkipStage::work(WorkingSetID* out) { + if (isEOF()) { return PlanStage::IS_EOF; } + + WorkingSetID id; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + // If we're still skipping results... + if (_toSkip > 0) { + // ...drop the result. + --_toSkip; + _ws->free(id); + return PlanStage::NEED_TIME; + } + + *out = id; + return PlanStage::ADVANCED; + } + else { + // NEED_TIME/YIELD, ERROR, IS_EOF + return status; + } + } + + void SkipStage::prepareToYield() { _child->prepareToYield(); } + + void SkipStage::recoverFromYield() { _child->recoverFromYield(); } + + void SkipStage::invalidate(const DiskLoc& dl) { _child->invalidate(dl); } + +} // namespace mongo diff --git a/src/mongo/db/exec/skip.h b/src/mongo/db/exec/skip.h new file mode 100644 index 00000000000..5086e9d66aa --- /dev/null +++ b/src/mongo/db/exec/skip.h @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mongo/db/diskloc.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { + + /** + * This stage implements skip functionality. It drops the first 'toSkip' results from its child + * then returns the rest verbatim. + * + * Preconditions: None. + */ + class SkipStage : public PlanStage { + public: + SkipStage(int toSkip, WorkingSet* ws, PlanStage* child); + virtual ~SkipStage(); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void prepareToYield(); + virtual void recoverFromYield(); + virtual void invalidate(const DiskLoc& dl); + + private: + WorkingSet* _ws; + scoped_ptr<PlanStage> _child; + + // We drop the first _toSkip results that we would have returned. + int _toSkip; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp index e15e8a96489..d55a9dd497b 100644 --- a/src/mongo/db/exec/stagedebug_cmd.cpp +++ b/src/mongo/db/exec/stagedebug_cmd.cpp @@ -20,7 +20,11 @@ #include "mongo/db/commands.h" #include "mongo/db/exec/and_hash.h" #include "mongo/db/exec/and_sorted.h" +#include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_scan.h" +#include "mongo/db/exec/limit.h" +#include "mongo/db/exec/or.h" +#include "mongo/db/exec/skip.h" #include "mongo/db/exec/simple_plan_runner.h" #include "mongo/db/index/catalog_hack.h" #include "mongo/db/jsobj.h" @@ -49,16 +53,16 @@ namespace mongo { * * node -> {andHash: {filter: {filter}, args: { nodes: [node, node]}}} * node -> {andSorted: {filter: {filter}, args: { nodes: [node, node]}}} + * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}} + * node -> {fetch: {filter: {filter}, args: {node: node}}} + * node -> {limit: {args: {node: node, num: posint}}} + * node -> {skip: {args: {node: node, num: posint}}} * * Forthcoming Nodes: * * node -> {cscan: {filter: {filter}, args: {name: "collectionname" }}} - * node -> {or: {filter: {filter}, args: { dedup:bool, nodes:[node, node]}}} - * node -> {fetch: {filter: {filter}, args: {node: node}}} * node -> {sort: {filter: {filter}, args: {node: node, pattern: objWithSortCriterion}}} * node -> {dedup: {filter: {filter}, args: {node: node, field: field}}} - * node -> {limit: {filter: filter}, args: {node: node, num: posint}} - * node -> {skip: {filter: filter}, args: {node: node, num: posint}} * node -> {unwind: {filter: filter}, args: {node: node, field: field}} */ class StageDebugCmd : public Command { @@ -203,6 +207,52 @@ namespace mongo { return andStage.release(); } + else if ("or" == nodeName) { + uassert(16934, "Nodes argument must be provided to AND", + nodeArgs["nodes"].isABSONObj()); + uassert(16935, "Dedup argument must be provided to OR", + !nodeArgs["dedup"].eoo()); + BSONObjIterator it(nodeArgs["nodes"].Obj()); + auto_ptr<OrStage> orStage(new OrStage(workingSet, nodeArgs["dedup"].Bool(), + matcher.release())); + while (it.more()) { + BSONElement e = it.next(); + if (!e.isABSONObj()) { return NULL; } + PlanStage* subNode = parseQuery(dbname, e.Obj(), workingSet); + uassert(16936, "Can't parse sub-node of OR: " + e.Obj().toString(), + NULL != subNode); + // takes ownership + orStage->addChild(subNode); + } + + return orStage.release(); + } + else if ("fetch" == nodeName) { + uassert(16929, "Node argument must be provided to fetch", + nodeArgs["node"].isABSONObj()); + PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet); + return new FetchStage(workingSet, subNode, matcher.release()); + } + else if ("limit" == nodeName) { + uassert(16937, "Limit stage doesn't have a filter (put it on the child)", + NULL == matcher.get()); + uassert(16930, "Node argument must be provided to limit", + nodeArgs["node"].isABSONObj()); + uassert(16931, "Num argument must be provided to limit", + nodeArgs["num"].isNumber()); + PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet); + return new LimitStage(nodeArgs["num"].numberInt(), workingSet, subNode); + } + else if ("skip" == nodeName) { + uassert(16938, "Skip stage doesn't have a filter (put it on the child)", + NULL == matcher.get()); + uassert(16932, "Node argument must be provided to skip", + nodeArgs["node"].isABSONObj()); + uassert(16933, "Num argument must be provided to skip", + nodeArgs["num"].isNumber()); + PlanStage* subNode = parseQuery(dbname, nodeArgs["node"].Obj(), workingSet); + return new SkipStage(nodeArgs["num"].numberInt(), workingSet, subNode); + } else { return NULL; } |