diff options
author | Benety Goh <benety@mongodb.com> | 2018-10-25 13:37:44 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-10-25 13:37:44 -0400 |
commit | 7d8df4c73934ef757e4007743ab6b69e3e80b7c2 (patch) | |
tree | c2711ed6a864b24c7c4c0de1b8c5347a87388244 /src | |
parent | b9db97a9ec5644c00db39a02813e82daa814b692 (diff) | |
download | mongo-7d8df4c73934ef757e4007743ab6b69e3e80b7c2.tar.gz |
SERVER-37589 split PlanExecutor into interface and implementation
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stats.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stats_test.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 235 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor_impl.cpp (renamed from src/mongo/db/query/plan_executor.cpp) | 202 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor_impl.h | 189 |
6 files changed, 357 insertions, 431 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 079ac5467ad..298c0c9559e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1044,7 +1044,7 @@ env.Library( 'pipeline/pipeline_d.cpp', 'query/get_executor.cpp', 'query/internal_plans.cpp', - 'query/plan_executor.cpp', + 'query/plan_executor_impl.cpp', 'query/plan_ranker.cpp', 'query/plan_yield_policy.cpp', 'query/query_yield.cpp', diff --git a/src/mongo/db/exec/plan_stats.cpp b/src/mongo/db/exec/plan_stats.cpp deleted file mode 100644 index 9d63e979267..00000000000 --- a/src/mongo/db/exec/plan_stats.cpp +++ /dev/null @@ -1,51 +0,0 @@ - -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/exec/plan_stats.h" -#include "mongo/db/jsobj.h" - -namespace mongo { - -void CommonStats::writeExplainTo(BSONObjBuilder* bob) const { - if (NULL == bob) { - return; - } - // potential overflow because original counters are unsigned 64-bit values - bob->append("works", static_cast<long long>(works)); - bob->append("advanced", static_cast<long long>(advanced)); -} - -// forward to CommonStats for now -// TODO: fill in specific stats -void PlanStageStats::writeExplainTo(BSONObjBuilder* bob) const { - common.writeExplainTo(bob); -} - -} // namespace mongo diff --git a/src/mongo/db/exec/plan_stats_test.cpp b/src/mongo/db/exec/plan_stats_test.cpp deleted file mode 100644 index 4846a2f6e98..00000000000 --- a/src/mongo/db/exec/plan_stats_test.cpp +++ /dev/null @@ -1,109 +0,0 @@ - -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -/** - * This file contains tests for mongo/db/exec/plan_stats.h - */ - -#include "mongo/db/exec/plan_stats.h" -#include "mongo/db/jsobj.h" -#include "mongo/unittest/unittest.h" - -using namespace mongo; - -namespace { - -/** - * Basic test on field initializers - */ -TEST(CommonStatsTest, defaultValues) { - CommonStats stats; - ASSERT_EQUALS(stats.works, static_cast<size_t>(0)); - ASSERT_EQUALS(stats.yields, static_cast<size_t>(0)); - ASSERT_EQUALS(stats.invalidates, static_cast<size_t>(0)); - ASSERT_EQUALS(stats.advanced, static_cast<size_t>(0)); - ASSERT_EQUALS(stats.needTime, static_cast<size_t>(0)); - ASSERT_EQUALS(stats.needYield, static_cast<size_t>(0)); - ASSERT_FALSE(stats.isEOF); -} - -/** - * Verifies null argument check in CommonStats::writeExplainTo - */ -TEST(CommonStatsTest, writeExplainToNullBuilder) { - CommonStats stats; - stats.writeExplainTo(NULL); -} - -/** - * Verifies null argument check in PlanStageStats::writeExplainTo - */ -TEST(PlanStageStatsTest, writeExplainToNullBuilder) { - CommonStats stats; - PlanStageStats pss(stats); - pss.writeExplainTo(NULL); -} - -/** - * Checks BSON output of CommonStats::writeExplainTo to ensure it contains - * correct values for CommonStats fields - */ -TEST(CommonStatsTest, writeExplainTo) { - CommonStats stats; - stats.works = static_cast<size_t>(2); - stats.advanced = static_cast<size_t>(3); - BSONObjBuilder bob; - stats.writeExplainTo(&bob); - BSONObj obj = bob.done(); - ASSERT_TRUE(obj.hasField("works")); - ASSERT_EQUALS(obj.getIntField("works"), 2); - ASSERT_TRUE(obj.hasField("advanced")); - ASSERT_EQUALS(obj.getIntField("advanced"), 3); -} - -/** - * Checks BSON output of PlanStageStats::writeExplainTo to ensure it contains - * correct values for CommonStats fields - */ -TEST(PlanStageStatsTest, writeExplainTo) { - CommonStats stats; - stats.works = static_cast<size_t>(2); - stats.advanced = static_cast<size_t>(3); - BSONObjBuilder bob; - PlanStageStats pss(stats); - pss.writeExplainTo(&bob); - BSONObj obj = bob.done(); - ASSERT_TRUE(obj.hasField("works")); - ASSERT_EQUALS(obj.getIntField("works"), 2); - ASSERT_TRUE(obj.hasField("advanced")); - ASSERT_EQUALS(obj.getIntField("advanced"), 3); -} - -} // namespace diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index b789a0cf338..9c97d30903d 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -31,7 +31,6 @@ #pragma once #include <boost/optional.hpp> -#include <queue> #include "mongo/base/status.h" #include "mongo/db/catalog/util/partitioned.h" @@ -138,6 +137,13 @@ public: }; /** + * RegistrationToken is the type of key used to register this PlanExecutor with the + * CursorManager. + */ + using RegistrationToken = + boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId>; + + /** * This class will ensure a PlanExecutor is disposed before it is deleted. */ class Deleter { @@ -146,9 +152,10 @@ public: * Constructs an empty deleter. Useful for creating a * unique_ptr<PlanExecutor, PlanExecutor::Deleter> without populating it. */ - Deleter() {} + Deleter() = default; - Deleter(OperationContext* opCtx, const Collection* collection); + inline Deleter(OperationContext* opCtx, CursorManager* cursorManager) + : _opCtx(opCtx), _cursorManager(cursorManager) {} /** * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume @@ -164,7 +171,19 @@ public: * been registered with the CursorManager, will deregister it. If 'execPtr' is a yielding * PlanExecutor, callers must hold a lock on the collection in at least MODE_IS. */ - void operator()(PlanExecutor* execPtr); + inline void operator()(PlanExecutor* execPtr) { + try { + // It is illegal to invoke operator() on a default constructed Deleter. + invariant(_opCtx); + if (!_dismissed) { + execPtr->dispose(_opCtx, _cursorManager); + } + delete execPtr; + } catch (...) { + std::terminate(); + } + } + private: OperationContext* _opCtx = nullptr; @@ -238,6 +257,13 @@ public: const Collection* collection, YieldPolicy yieldPolicy); + /** + * A PlanExecutor must be disposed before destruction. In most cases, this will happen + * automatically through a PlanExecutor::Deleter or a ClientCursor. + */ + PlanExecutor() = default; + virtual ~PlanExecutor() = default; + // // Accessors // @@ -245,38 +271,27 @@ public: /** * Get the working set used by this executor, without transferring ownership. */ - WorkingSet* getWorkingSet() const; + virtual WorkingSet* getWorkingSet() const = 0; /** * Get the stage tree wrapped by this executor, without transferring ownership. */ - PlanStage* getRootStage() const; + virtual PlanStage* getRootStage() const = 0; /** * Get the query that this executor is executing, without transferring ownership. */ - CanonicalQuery* getCanonicalQuery() const; + virtual CanonicalQuery* getCanonicalQuery() const = 0; /** * Return the NS that the query is running over. */ - const NamespaceString& nss() const { - return _nss; - } + virtual const NamespaceString& nss() const = 0; /** * Return the OperationContext that the plan is currently executing within. */ - OperationContext* getOpCtx() const; - - /** - * Generates a tree of stats objects with a separate lifetime from the execution - * stage tree wrapped by this PlanExecutor. - * - * This may be called without holding any locks. It also may be called on a PlanExecutor that - * has been killed or has produced an error. - */ - std::unique_ptr<PlanStageStats> getStats() const; + virtual OperationContext* getOpCtx() const = 0; // // Methods that just pass down to the PlanStage tree. @@ -288,7 +303,7 @@ public: * While in the "saved" state, it is only legal to call restoreState, * detachFromOperationContext, or the destructor. */ - void saveState(); + virtual void saveState() = 0; /** * Restores the state saved by a saveState() call. @@ -304,7 +319,7 @@ public: * this scenario, locks will have been released, and will not be held when control returns to * the caller. */ - Status restoreState(); + virtual Status restoreState() = 0; /** * Detaches from the OperationContext and releases any storage-engine state. @@ -313,7 +328,7 @@ public: * only legal to call reattachToOperationContext or the destructor. It is not legal to call * detachFromOperationContext() while already in the detached state. */ - void detachFromOperationContext(); + virtual void detachFromOperationContext() = 0; /** * Reattaches to the OperationContext and reacquires any storage-engine state. @@ -321,7 +336,7 @@ public: * It is only legal to call this in the "detached" state. On return, the cursor is left in a * "saved" state, so callers must still call restoreState to use this object. */ - void reattachToOperationContext(OperationContext* opCtx); + virtual void reattachToOperationContext(OperationContext* opCtx) = 0; /** * Same as restoreState but without the logic to retry if a WriteConflictException is @@ -329,7 +344,7 @@ public: * * This is only public for PlanYieldPolicy. DO NOT CALL ANYWHERE ELSE. */ - Status restoreStateWithoutRetrying(); + virtual Status restoreStateWithoutRetrying() = 0; // // Running Support @@ -344,9 +359,9 @@ public: * * If a YIELD_AUTO policy is set, then this method may yield. */ - ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut); + virtual ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) = 0; - ExecState getNext(BSONObj* objOut, RecordId* dlOut); + virtual ExecState getNext(BSONObj* objOut, RecordId* dlOut) = 0; /** * Returns 'true' if the plan is done producing results (or writing), 'false' otherwise. @@ -354,7 +369,7 @@ public: * Tailable cursors are a possible exception to this: they may have further results even if * isEOF() returns true. */ - bool isEOF(); + virtual bool isEOF() = 0; /** * Execute the plan to completion, throwing out the results. Used when you want to work the @@ -366,7 +381,7 @@ public: * error occurs, it is illegal to subsequently access the collection, since it may have been * dropped. */ - Status executePlan(); + virtual Status executePlan() = 0; // // Concurrency-related methods. @@ -380,7 +395,7 @@ public: * method is called multiple times, only the first 'killStatus' will be retained. It is an error * to call this method with Status::OK. */ - void markAsKilled(Status killStatus); + virtual void markAsKilled(Status killStatus) = 0; /** * Cleans up any state associated with this PlanExecutor. Must be called before deleting this @@ -395,7 +410,7 @@ public: * is the owner's responsibility to call dispose() with a valid OperationContext before * deleting the PlanExecutor. */ - void dispose(OperationContext* opCtx, CursorManager* cursorManager); + virtual void dispose(OperationContext* opCtx, CursorManager* cursorManager) = 0; /** * Helper method to aid in displaying an ExecState for debug or other recreational purposes. @@ -414,173 +429,39 @@ public: * If used in combination with getNextSnapshotted(), then the SnapshotId associated with * 'obj' will be null when 'obj' is dequeued. */ - void enqueue(const BSONObj& obj); + virtual void enqueue(const BSONObj& obj) = 0; /** * Helper method which returns a set of BSONObj, where each represents a sort order of our * output. */ - BSONObjSet getOutputSorts() const; + virtual BSONObjSet getOutputSorts() const = 0; /** * Communicate to this PlanExecutor that it is no longer registered with the CursorManager as a * 'non-cached PlanExecutor'. */ - void unsetRegistered() { - _registrationToken.reset(); - } - - boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId> - getRegistrationToken() const& { - return _registrationToken; - } + virtual void unsetRegistered() = 0; + virtual RegistrationToken getRegistrationToken() const& = 0; void getRegistrationToken() && = delete; + virtual void setRegistrationToken(RegistrationToken token) & = 0; - void setRegistrationToken( - Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId token) & { - invariant(!_registrationToken); - _registrationToken = token; - } - - bool isMarkedAsKilled() const { - return !_killStatus.isOK(); - } - - Status getKillStatus() { - invariant(isMarkedAsKilled()); - return _killStatus; - } + virtual bool isMarkedAsKilled() const = 0; + virtual Status getKillStatus() = 0; - bool isDisposed() const { - return _currentState == kDisposed; - } - - bool isDetached() const { - return _currentState == kDetached; - } + virtual bool isDisposed() const = 0; + virtual bool isDetached() const = 0; /** * If the last oplog timestamp is being tracked for this PlanExecutor, return it. * Otherwise return a null timestamp. */ - Timestamp getLatestOplogTimestamp(); - -private: - /** - * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called - * on a tailable and awaitData cursor that still has time left and hasn't been interrupted. - */ - bool shouldListenForInserts(); - - /** - * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore - * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF - * should be returned immediately. - */ - bool shouldWaitForInserts(); - - /** - * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor - * is not capable of yielding based on a notifier. - */ - std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(); - - /** - * Yields locks and waits for inserts to the collection. Returns ADVANCED if there has been an - * insertion and there may be new results. Returns DEAD if the PlanExecutor was killed during a - * yield. This method is only to be used for tailable and awaitData cursors, so rather than - * returning DEAD if the operation has exceeded its time limit, we return IS_EOF to preserve - * this PlanExecutor for future use. - * - * If an error is encountered and 'errorObj' is provided, it is populated with an object - * describing the error. - */ - ExecState waitForInserts(CappedInsertNotifierData* notifierData, - Snapshotted<BSONObj>* errorObj); - - ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); + virtual Timestamp getLatestOplogTimestamp() = 0; /** - * New PlanExecutor instances are created with the static make() methods above. + * Turns a BSONObj representing an error status produced by getNext() into a Status. */ - PlanExecutor(OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - std::unique_ptr<QuerySolution> qs, - std::unique_ptr<CanonicalQuery> cq, - const Collection* collection, - NamespaceString nss, - YieldPolicy yieldPolicy); - - /** - * A PlanExecutor must be disposed before destruction. In most cases, this will happen - * automatically through a PlanExecutor::Deleter or a ClientCursor. - */ - ~PlanExecutor(); - - /** - * Public factory methods delegate to this private factory to do their work. - */ - static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( - OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - std::unique_ptr<QuerySolution> qs, - std::unique_ptr<CanonicalQuery> cq, - const Collection* collection, - NamespaceString nss, - YieldPolicy yieldPolicy); - - /** - * Clients of PlanExecutor expect that on receiving a new instance from one of the make() - * factory methods, plan selection has already been completed. In order to enforce this - * property, this function is called to do plan selection prior to returning the new - * PlanExecutor. - * - * If the tree contains plan selection stages, such as MultiPlanStage or SubplanStage, - * this calls into their underlying plan selection facilities. Otherwise, does nothing. - * - * If a YIELD_AUTO policy is set then locks are yielded during plan selection. - * - * Returns a non-OK status if query planning fails. In particular, this function returns - * ErrorCodes::QueryPlanKilled if plan execution cannot proceed due to a concurrent write or - * catalog operation. - */ - Status pickBestPlan(const Collection* collection); - - // The OperationContext that we're executing within. This can be updated if necessary by using - // detachFromOperationContext() and reattachToOperationContext(). - OperationContext* _opCtx; - - std::unique_ptr<CanonicalQuery> _cq; - std::unique_ptr<WorkingSet> _workingSet; - std::unique_ptr<QuerySolution> _qs; - std::unique_ptr<PlanStage> _root; - - // If _killStatus has a non-OK value, then we have been killed and the value represents the - // reason for the kill. - Status _killStatus = Status::OK(); - - // What namespace are we operating over? - NamespaceString _nss; - - // This is used to handle automatic yielding when allowed by the YieldPolicy. Never NULL. - // TODO make this a non-pointer member. This requires some header shuffling so that this - // file includes plan_yield_policy.h rather than the other way around. - const std::unique_ptr<PlanYieldPolicy> _yieldPolicy; - - // A stash of results generated by this plan that the user of the PlanExecutor didn't want - // to consume yet. We empty the queue before retrieving further results from the plan - // stages. - std::queue<BSONObj> _stash; - - enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable; - - // Set if this PlanExecutor is registered with the CursorManager. - boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId> - _registrationToken; - - bool _everDetachedFromOperationContext = false; + virtual Status getMemberObjectStatus(const BSONObj& memberObj) const = 0; }; } // namespace mongo diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 111fa4b376f..037df73f515 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -32,7 +32,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_impl.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" @@ -132,7 +132,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( unique_ptr<PlanStage> rt, const Collection* collection, YieldPolicy yieldPolicy) { - return PlanExecutor::make( + return PlanExecutorImpl::make( opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, {}, yieldPolicy); } @@ -143,14 +143,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( unique_ptr<PlanStage> rt, NamespaceString nss, YieldPolicy yieldPolicy) { - return PlanExecutor::make(opCtx, - std::move(ws), - std::move(rt), - nullptr, - nullptr, - nullptr, - std::move(nss), - yieldPolicy); + return PlanExecutorImpl::make(opCtx, + std::move(ws), + std::move(rt), + nullptr, + nullptr, + nullptr, + std::move(nss), + yieldPolicy); } // static @@ -161,7 +161,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( unique_ptr<CanonicalQuery> cq, const Collection* collection, YieldPolicy yieldPolicy) { - return PlanExecutor::make( + return PlanExecutorImpl::make( opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, {}, yieldPolicy); } @@ -174,18 +174,18 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( unique_ptr<CanonicalQuery> cq, const Collection* collection, YieldPolicy yieldPolicy) { - return PlanExecutor::make(opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - std::move(cq), - collection, - {}, - yieldPolicy); + return PlanExecutorImpl::make(opCtx, + std::move(ws), + std::move(rt), + std::move(qs), + std::move(cq), + collection, + {}, + yieldPolicy); } // static -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutorImpl::make( OperationContext* opCtx, unique_ptr<WorkingSet> ws, unique_ptr<PlanStage> rt, @@ -195,19 +195,19 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( NamespaceString nss, YieldPolicy yieldPolicy) { - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec( - new PlanExecutor(opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - std::move(cq), - collection, - std::move(nss), - yieldPolicy), - PlanExecutor::Deleter(opCtx, collection)); + auto execImpl = new PlanExecutorImpl(opCtx, + std::move(ws), + std::move(rt), + std::move(qs), + std::move(cq), + collection, + std::move(nss), + yieldPolicy); + PlanExecutor::Deleter planDeleter(opCtx, collection ? collection->getCursorManager() : nullptr); + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec(execImpl, std::move(planDeleter)); // Perform plan selection, if necessary. - Status status = exec->pickBestPlan(collection); + Status status = execImpl->_pickBestPlan(); if (!status.isOK()) { return status; } @@ -215,14 +215,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( return std::move(exec); } -PlanExecutor::PlanExecutor(OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<QuerySolution> qs, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - NamespaceString nss, - YieldPolicy yieldPolicy) +PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx, + unique_ptr<WorkingSet> ws, + unique_ptr<PlanStage> rt, + unique_ptr<QuerySolution> qs, + unique_ptr<CanonicalQuery> cq, + const Collection* collection, + NamespaceString nss, + YieldPolicy yieldPolicy) : _opCtx(opCtx), _cq(std::move(cq)), _workingSet(std::move(ws)), @@ -247,7 +247,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, } } -Status PlanExecutor::pickBestPlan(const Collection* collection) { +Status PlanExecutorImpl::_pickBestPlan() { invariant(_currentState == kUsable); // First check if we need to do subplanning. @@ -278,7 +278,7 @@ Status PlanExecutor::pickBestPlan(const Collection* collection) { return Status::OK(); } -PlanExecutor::~PlanExecutor() { +PlanExecutorImpl::~PlanExecutorImpl() { invariant(_currentState == kDisposed); } @@ -296,23 +296,23 @@ string PlanExecutor::statestr(ExecState s) { } } -WorkingSet* PlanExecutor::getWorkingSet() const { +WorkingSet* PlanExecutorImpl::getWorkingSet() const { return _workingSet.get(); } -PlanStage* PlanExecutor::getRootStage() const { +PlanStage* PlanExecutorImpl::getRootStage() const { return _root.get(); } -CanonicalQuery* PlanExecutor::getCanonicalQuery() const { +CanonicalQuery* PlanExecutorImpl::getCanonicalQuery() const { return _cq.get(); } -unique_ptr<PlanStageStats> PlanExecutor::getStats() const { - return _root->getStats(); +const NamespaceString& PlanExecutorImpl::nss() const { + return _nss; } -BSONObjSet PlanExecutor::getOutputSorts() const { +BSONObjSet PlanExecutorImpl::getOutputSorts() const { if (_qs && _qs->root) { _qs->root->computeProperties(); return _qs->root->getSort(); @@ -337,11 +337,11 @@ BSONObjSet PlanExecutor::getOutputSorts() const { return SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } -OperationContext* PlanExecutor::getOpCtx() const { +OperationContext* PlanExecutorImpl::getOpCtx() const { return _opCtx; } -void PlanExecutor::saveState() { +void PlanExecutorImpl::saveState() { invariant(_currentState == kUsable || _currentState == kSaved); // The query stages inside this stage tree might buffer record ids (e.g. text, geoNear, @@ -355,7 +355,7 @@ void PlanExecutor::saveState() { _currentState = kSaved; } -Status PlanExecutor::restoreState() { +Status PlanExecutorImpl::restoreState() { try { return restoreStateWithoutRetrying(); } catch (const WriteConflictException&) { @@ -367,7 +367,7 @@ Status PlanExecutor::restoreState() { } } -Status PlanExecutor::restoreStateWithoutRetrying() { +Status PlanExecutorImpl::restoreStateWithoutRetrying() { invariant(_currentState == kSaved); if (!isMarkedAsKilled()) { @@ -378,7 +378,7 @@ Status PlanExecutor::restoreStateWithoutRetrying() { return _killStatus; } -void PlanExecutor::detachFromOperationContext() { +void PlanExecutorImpl::detachFromOperationContext() { invariant(_currentState == kSaved); _opCtx = nullptr; _root->detachFromOperationContext(); @@ -386,7 +386,7 @@ void PlanExecutor::detachFromOperationContext() { _everDetachedFromOperationContext = true; } -void PlanExecutor::reattachToOperationContext(OperationContext* opCtx) { +void PlanExecutorImpl::reattachToOperationContext(OperationContext* opCtx) { invariant(_currentState == kDetached); // We're reattaching for a getMore now. Reset the yield timer in order to prevent from @@ -398,9 +398,9 @@ void PlanExecutor::reattachToOperationContext(OperationContext* opCtx) { _currentState = kSaved; } -PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) { +PlanExecutor::ExecState PlanExecutorImpl::getNext(BSONObj* objOut, RecordId* dlOut) { Snapshotted<BSONObj> snapshotted; - ExecState state = getNextImpl(objOut ? &snapshotted : NULL, dlOut); + ExecState state = _getNextImpl(objOut ? &snapshotted : NULL, dlOut); if (objOut) { *objOut = snapshotted.value(); @@ -409,24 +409,24 @@ PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) return state; } -PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* objOut, - RecordId* dlOut) { +PlanExecutor::ExecState PlanExecutorImpl::getNextSnapshotted(Snapshotted<BSONObj>* objOut, + RecordId* dlOut) { // Detaching from the OperationContext means that the returned snapshot ids could be invalid. invariant(!_everDetachedFromOperationContext); - return getNextImpl(objOut, dlOut); + return _getNextImpl(objOut, dlOut); } -bool PlanExecutor::shouldListenForInserts() { +bool PlanExecutorImpl::_shouldListenForInserts() { return _cq && _cq->getQueryRequest().isTailableAndAwaitData() && awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && awaitDataState(_opCtx).waitForInsertsDeadline > _opCtx->getServiceContext()->getPreciseClockSource()->now(); } -bool PlanExecutor::shouldWaitForInserts() { +bool PlanExecutorImpl::_shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. - if (shouldListenForInserts()) { + if (_shouldListenForInserts()) { // We expect awaitData cursors to be yielding. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); @@ -443,7 +443,7 @@ bool PlanExecutor::shouldWaitForInserts() { return false; } -std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() { +std::shared_ptr<CappedInsertNotifier> PlanExecutorImpl::_getCappedInsertNotifier() { // We don't expect to need a capped insert notifier for non-yielding plans. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); @@ -458,8 +458,8 @@ std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() { return collection->getCappedInsertNotifier(); } -PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData, - Snapshotted<BSONObj>* errorObj) { +PlanExecutor::ExecState PlanExecutorImpl::_waitForInserts(CappedInsertNotifierData* notifierData, + Snapshotted<BSONObj>* errorObj) { invariant(notifierData->notifier); // The notifier wait() method will not wait unless the version passed to it matches the @@ -490,7 +490,8 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n return DEAD; } -PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { +PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<BSONObj>* objOut, + RecordId* dlOut) { if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) { Status status(ErrorCodes::InternalError, str::stream() << "PlanExecutor hit planExecutorAlwaysFails fail point"); @@ -523,9 +524,9 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped // insert notifier is necessary for the notifierVersion to advance. CappedInsertNotifierData cappedInsertNotifierData; - if (shouldListenForInserts()) { + if (_shouldListenForInserts()) { // We always construct the CappedInsertNotifier for awaitData cursors. - cappedInsertNotifierData.notifier = getCappedInsertNotifier(); + cappedInsertNotifierData.notifier = _getCappedInsertNotifier(); } for (;;) { // These are the conditions which can cause us to yield: @@ -609,10 +610,10 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, "enabled. Blocking until fail point is disabled."; MONGO_FAIL_POINT_PAUSE_WHILE_SET(planExecutorHangBeforeShouldWaitForInserts); } - if (!shouldWaitForInserts()) { + if (!_shouldWaitForInserts()) { return PlanExecutor::IS_EOF; } - const ExecState waitResult = waitForInserts(&cappedInsertNotifierData, objOut); + const ExecState waitResult = _waitForInserts(&cappedInsertNotifierData, objOut); if (waitResult == PlanExecutor::ADVANCED) { // There may be more results, keep going. continue; @@ -633,12 +634,12 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } } -bool PlanExecutor::isEOF() { +bool PlanExecutorImpl::isEOF() { invariant(_currentState == kUsable); return isMarkedAsKilled() || (_stash.empty() && _root->isEOF()); } -void PlanExecutor::markAsKilled(Status killStatus) { +void PlanExecutorImpl::markAsKilled(Status killStatus) { invariant(!killStatus.isOK()); // If killed multiple times, only retain the first status. if (_killStatus.isOK()) { @@ -646,7 +647,7 @@ void PlanExecutor::markAsKilled(Status killStatus) { } } -void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager) { +void PlanExecutorImpl::dispose(OperationContext* opCtx, CursorManager* cursorManager) { if (_currentState == kDisposed) { return; } @@ -664,7 +665,7 @@ void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager _currentState = kDisposed; } -Status PlanExecutor::executePlan() { +Status PlanExecutorImpl::executePlan() { invariant(_currentState == kUsable); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; @@ -677,7 +678,7 @@ Status PlanExecutor::executePlan() { return _killStatus; } - auto errorStatus = WorkingSetCommon::getMemberObjectStatus(obj); + auto errorStatus = getMemberObjectStatus(obj); invariant(!errorStatus.isOK()); return errorStatus.withContext(str::stream() << "Exec error resulting in state " << PlanExecutor::statestr(state)); @@ -689,11 +690,41 @@ Status PlanExecutor::executePlan() { } -void PlanExecutor::enqueue(const BSONObj& obj) { +void PlanExecutorImpl::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } -Timestamp PlanExecutor::getLatestOplogTimestamp() { +void PlanExecutorImpl::unsetRegistered() { + _registrationToken.reset(); +} + +PlanExecutor::RegistrationToken PlanExecutorImpl::getRegistrationToken() const& { + return _registrationToken; +} + +void PlanExecutorImpl::setRegistrationToken(RegistrationToken token)& { + invariant(!_registrationToken); + _registrationToken = token; +} + +bool PlanExecutorImpl::isMarkedAsKilled() const { + return !_killStatus.isOK(); +} + +Status PlanExecutorImpl::getKillStatus() { + invariant(isMarkedAsKilled()); + return _killStatus; +} + +bool PlanExecutorImpl::isDisposed() const { + return _currentState == kDisposed; +} + +bool PlanExecutorImpl::isDetached() const { + return _currentState == kDetached; +} + +Timestamp PlanExecutorImpl::getLatestOplogTimestamp() { if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) @@ -701,23 +732,8 @@ Timestamp PlanExecutor::getLatestOplogTimestamp() { return Timestamp(); } -// -// PlanExecutor::Deleter -// - -PlanExecutor::Deleter::Deleter(OperationContext* opCtx, const Collection* collection) - : _opCtx(opCtx), _cursorManager(collection ? collection->getCursorManager() : nullptr) {} - -void PlanExecutor::Deleter::operator()(PlanExecutor* execPtr) { - try { - invariant(_opCtx); // It is illegal to invoke operator() on a default constructed Deleter. - if (!_dismissed) { - execPtr->dispose(_opCtx, _cursorManager); - } - delete execPtr; - } catch (...) { - std::terminate(); - } +Status PlanExecutorImpl::getMemberObjectStatus(const BSONObj& memberObj) const { + return WorkingSetCommon::getMemberObjectStatus(memberObj); } } // namespace mongo diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h new file mode 100644 index 00000000000..6447d014cdc --- /dev/null +++ b/src/mongo/db/query/plan_executor_impl.h @@ -0,0 +1,189 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <queue> + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/query/plan_executor.h" + +namespace mongo { + +class PlanExecutorImpl : public PlanExecutor { + MONGO_DISALLOW_COPYING(PlanExecutorImpl); + +public: + /** + * Public factory methods delegate to this impl factory to do their work. + */ + static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<QuerySolution> qs, + std::unique_ptr<CanonicalQuery> cq, + const Collection* collection, + NamespaceString nss, + YieldPolicy yieldPolicy); + + virtual ~PlanExecutorImpl(); + WorkingSet* getWorkingSet() const final; + PlanStage* getRootStage() const final; + CanonicalQuery* getCanonicalQuery() const final; + const NamespaceString& nss() const final; + OperationContext* getOpCtx() const final; + void saveState() final; + Status restoreState() final; + void detachFromOperationContext() final; + void reattachToOperationContext(OperationContext* opCtx) final; + Status restoreStateWithoutRetrying() final; + ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) final; + ExecState getNext(BSONObj* objOut, RecordId* dlOut) final; + bool isEOF() final; + Status executePlan() final; + void markAsKilled(Status killStatus) final; + void dispose(OperationContext* opCtx, CursorManager* cursorManager) final; + void enqueue(const BSONObj& obj) final; + BSONObjSet getOutputSorts() const final; + void unsetRegistered() final; + RegistrationToken getRegistrationToken() const&; + void setRegistrationToken(RegistrationToken token) & final; + bool isMarkedAsKilled() const final; + Status getKillStatus() final; + bool isDisposed() const final; + bool isDetached() const final; + Timestamp getLatestOplogTimestamp() final; + Status getMemberObjectStatus(const BSONObj& memberObj) const final; + +private: + /** + * New PlanExecutor instances are created with the static make() method above. + */ + PlanExecutorImpl(OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<QuerySolution> qs, + std::unique_ptr<CanonicalQuery> cq, + const Collection* collection, + NamespaceString nss, + YieldPolicy yieldPolicy); + + /** + * Clients of PlanExecutor expect that on receiving a new instance from one of the make() + * factory methods, plan selection has already been completed. In order to enforce this + * property, this function is called to do plan selection prior to returning the new + * PlanExecutor. + * + * If the tree contains plan selection stages, such as MultiPlanStage or SubplanStage, + * this calls into their underlying plan selection facilities. Otherwise, does nothing. + * + * If a YIELD_AUTO policy is set then locks are yielded during plan selection. + * + * Returns a non-OK status if query planning fails. In particular, this function returns + * ErrorCodes::QueryPlanKilled if plan execution cannot proceed due to a concurrent write or + * catalog operation. + */ + Status _pickBestPlan(); + + /** + * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called + * on a tailable and awaitData cursor that still has time left and hasn't been interrupted. + */ + bool _shouldListenForInserts(); + + /** + * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore + * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF + * should be returned immediately. + */ + bool _shouldWaitForInserts(); + + /** + * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor + * is not capable of yielding based on a notifier. + */ + std::shared_ptr<CappedInsertNotifier> _getCappedInsertNotifier(); + + /** + * Yields locks and waits for inserts to the collection. Returns ADVANCED if there has been an + * insertion and there may be new results. Returns DEAD if the PlanExecutor was killed during a + * yield. This method is only to be used for tailable and awaitData cursors, so rather than + * returning DEAD if the operation has exceeded its time limit, we return IS_EOF to preserve + * this PlanExecutor for future use. + * + * If an error is encountered and 'errorObj' is provided, it is populated with an object + * describing the error. + */ + ExecState _waitForInserts(CappedInsertNotifierData* notifierData, + Snapshotted<BSONObj>* errorObj); + + /** + * Common implementation for getNext() and getNextSnapshotted(). + */ + ExecState _getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); + + // The OperationContext that we're executing within. This can be updated if necessary by using + // detachFromOperationContext() and reattachToOperationContext(). + OperationContext* _opCtx; + + std::unique_ptr<CanonicalQuery> _cq; + std::unique_ptr<WorkingSet> _workingSet; + std::unique_ptr<QuerySolution> _qs; + std::unique_ptr<PlanStage> _root; + + // If _killStatus has a non-OK value, then we have been killed and the value represents the + // reason for the kill. + Status _killStatus = Status::OK(); + + // What namespace are we operating over? + NamespaceString _nss; + + // This is used to handle automatic yielding when allowed by the YieldPolicy. Never NULL. + // TODO make this a non-pointer member. This requires some header shuffling so that this + // file includes plan_yield_policy.h rather than the other way around. + const std::unique_ptr<PlanYieldPolicy> _yieldPolicy; + + // A stash of results generated by this plan that the user of the PlanExecutor didn't want + // to consume yet. We empty the queue before retrieving further results from the plan + // stages. + std::queue<BSONObj> _stash; + + enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable; + + // Set if this PlanExecutor is registered with the CursorManager. + boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId> + _registrationToken; + + bool _everDetachedFromOperationContext = false; +}; + +} // namespace mongo |