summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/exec/plan_stats.cpp51
-rw-r--r--src/mongo/db/exec/plan_stats_test.cpp109
-rw-r--r--src/mongo/db/query/plan_executor.h235
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp (renamed from src/mongo/db/query/plan_executor.cpp)202
-rw-r--r--src/mongo/db/query/plan_executor_impl.h189
6 files changed, 357 insertions, 431 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 079ac5467ad..298c0c9559e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1044,7 +1044,7 @@ env.Library(
'pipeline/pipeline_d.cpp',
'query/get_executor.cpp',
'query/internal_plans.cpp',
- 'query/plan_executor.cpp',
+ 'query/plan_executor_impl.cpp',
'query/plan_ranker.cpp',
'query/plan_yield_policy.cpp',
'query/query_yield.cpp',
diff --git a/src/mongo/db/exec/plan_stats.cpp b/src/mongo/db/exec/plan_stats.cpp
deleted file mode 100644
index 9d63e979267..00000000000
--- a/src/mongo/db/exec/plan_stats.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/exec/plan_stats.h"
-#include "mongo/db/jsobj.h"
-
-namespace mongo {
-
-void CommonStats::writeExplainTo(BSONObjBuilder* bob) const {
- if (NULL == bob) {
- return;
- }
- // potential overflow because original counters are unsigned 64-bit values
- bob->append("works", static_cast<long long>(works));
- bob->append("advanced", static_cast<long long>(advanced));
-}
-
-// forward to CommonStats for now
-// TODO: fill in specific stats
-void PlanStageStats::writeExplainTo(BSONObjBuilder* bob) const {
- common.writeExplainTo(bob);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/exec/plan_stats_test.cpp b/src/mongo/db/exec/plan_stats_test.cpp
deleted file mode 100644
index 4846a2f6e98..00000000000
--- a/src/mongo/db/exec/plan_stats_test.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-/**
- * This file contains tests for mongo/db/exec/plan_stats.h
- */
-
-#include "mongo/db/exec/plan_stats.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/unittest/unittest.h"
-
-using namespace mongo;
-
-namespace {
-
-/**
- * Basic test on field initializers
- */
-TEST(CommonStatsTest, defaultValues) {
- CommonStats stats;
- ASSERT_EQUALS(stats.works, static_cast<size_t>(0));
- ASSERT_EQUALS(stats.yields, static_cast<size_t>(0));
- ASSERT_EQUALS(stats.invalidates, static_cast<size_t>(0));
- ASSERT_EQUALS(stats.advanced, static_cast<size_t>(0));
- ASSERT_EQUALS(stats.needTime, static_cast<size_t>(0));
- ASSERT_EQUALS(stats.needYield, static_cast<size_t>(0));
- ASSERT_FALSE(stats.isEOF);
-}
-
-/**
- * Verifies null argument check in CommonStats::writeExplainTo
- */
-TEST(CommonStatsTest, writeExplainToNullBuilder) {
- CommonStats stats;
- stats.writeExplainTo(NULL);
-}
-
-/**
- * Verifies null argument check in PlanStageStats::writeExplainTo
- */
-TEST(PlanStageStatsTest, writeExplainToNullBuilder) {
- CommonStats stats;
- PlanStageStats pss(stats);
- pss.writeExplainTo(NULL);
-}
-
-/**
- * Checks BSON output of CommonStats::writeExplainTo to ensure it contains
- * correct values for CommonStats fields
- */
-TEST(CommonStatsTest, writeExplainTo) {
- CommonStats stats;
- stats.works = static_cast<size_t>(2);
- stats.advanced = static_cast<size_t>(3);
- BSONObjBuilder bob;
- stats.writeExplainTo(&bob);
- BSONObj obj = bob.done();
- ASSERT_TRUE(obj.hasField("works"));
- ASSERT_EQUALS(obj.getIntField("works"), 2);
- ASSERT_TRUE(obj.hasField("advanced"));
- ASSERT_EQUALS(obj.getIntField("advanced"), 3);
-}
-
-/**
- * Checks BSON output of PlanStageStats::writeExplainTo to ensure it contains
- * correct values for CommonStats fields
- */
-TEST(PlanStageStatsTest, writeExplainTo) {
- CommonStats stats;
- stats.works = static_cast<size_t>(2);
- stats.advanced = static_cast<size_t>(3);
- BSONObjBuilder bob;
- PlanStageStats pss(stats);
- pss.writeExplainTo(&bob);
- BSONObj obj = bob.done();
- ASSERT_TRUE(obj.hasField("works"));
- ASSERT_EQUALS(obj.getIntField("works"), 2);
- ASSERT_TRUE(obj.hasField("advanced"));
- ASSERT_EQUALS(obj.getIntField("advanced"), 3);
-}
-
-} // namespace
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index b789a0cf338..9c97d30903d 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -31,7 +31,6 @@
#pragma once
#include <boost/optional.hpp>
-#include <queue>
#include "mongo/base/status.h"
#include "mongo/db/catalog/util/partitioned.h"
@@ -138,6 +137,13 @@ public:
};
/**
+ * RegistrationToken is the type of key used to register this PlanExecutor with the
+ * CursorManager.
+ */
+ using RegistrationToken =
+ boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId>;
+
+ /**
* This class will ensure a PlanExecutor is disposed before it is deleted.
*/
class Deleter {
@@ -146,9 +152,10 @@ public:
* Constructs an empty deleter. Useful for creating a
* unique_ptr<PlanExecutor, PlanExecutor::Deleter> without populating it.
*/
- Deleter() {}
+ Deleter() = default;
- Deleter(OperationContext* opCtx, const Collection* collection);
+ inline Deleter(OperationContext* opCtx, CursorManager* cursorManager)
+ : _opCtx(opCtx), _cursorManager(cursorManager) {}
/**
* If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume
@@ -164,7 +171,19 @@ public:
* been registered with the CursorManager, will deregister it. If 'execPtr' is a yielding
* PlanExecutor, callers must hold a lock on the collection in at least MODE_IS.
*/
- void operator()(PlanExecutor* execPtr);
+ inline void operator()(PlanExecutor* execPtr) {
+ try {
+ // It is illegal to invoke operator() on a default constructed Deleter.
+ invariant(_opCtx);
+ if (!_dismissed) {
+ execPtr->dispose(_opCtx, _cursorManager);
+ }
+ delete execPtr;
+ } catch (...) {
+ std::terminate();
+ }
+ }
+
private:
OperationContext* _opCtx = nullptr;
@@ -238,6 +257,13 @@ public:
const Collection* collection,
YieldPolicy yieldPolicy);
+ /**
+ * A PlanExecutor must be disposed before destruction. In most cases, this will happen
+ * automatically through a PlanExecutor::Deleter or a ClientCursor.
+ */
+ PlanExecutor() = default;
+ virtual ~PlanExecutor() = default;
+
//
// Accessors
//
@@ -245,38 +271,27 @@ public:
/**
* Get the working set used by this executor, without transferring ownership.
*/
- WorkingSet* getWorkingSet() const;
+ virtual WorkingSet* getWorkingSet() const = 0;
/**
* Get the stage tree wrapped by this executor, without transferring ownership.
*/
- PlanStage* getRootStage() const;
+ virtual PlanStage* getRootStage() const = 0;
/**
* Get the query that this executor is executing, without transferring ownership.
*/
- CanonicalQuery* getCanonicalQuery() const;
+ virtual CanonicalQuery* getCanonicalQuery() const = 0;
/**
* Return the NS that the query is running over.
*/
- const NamespaceString& nss() const {
- return _nss;
- }
+ virtual const NamespaceString& nss() const = 0;
/**
* Return the OperationContext that the plan is currently executing within.
*/
- OperationContext* getOpCtx() const;
-
- /**
- * Generates a tree of stats objects with a separate lifetime from the execution
- * stage tree wrapped by this PlanExecutor.
- *
- * This may be called without holding any locks. It also may be called on a PlanExecutor that
- * has been killed or has produced an error.
- */
- std::unique_ptr<PlanStageStats> getStats() const;
+ virtual OperationContext* getOpCtx() const = 0;
//
// Methods that just pass down to the PlanStage tree.
@@ -288,7 +303,7 @@ public:
* While in the "saved" state, it is only legal to call restoreState,
* detachFromOperationContext, or the destructor.
*/
- void saveState();
+ virtual void saveState() = 0;
/**
* Restores the state saved by a saveState() call.
@@ -304,7 +319,7 @@ public:
* this scenario, locks will have been released, and will not be held when control returns to
* the caller.
*/
- Status restoreState();
+ virtual Status restoreState() = 0;
/**
* Detaches from the OperationContext and releases any storage-engine state.
@@ -313,7 +328,7 @@ public:
* only legal to call reattachToOperationContext or the destructor. It is not legal to call
* detachFromOperationContext() while already in the detached state.
*/
- void detachFromOperationContext();
+ virtual void detachFromOperationContext() = 0;
/**
* Reattaches to the OperationContext and reacquires any storage-engine state.
@@ -321,7 +336,7 @@ public:
* It is only legal to call this in the "detached" state. On return, the cursor is left in a
* "saved" state, so callers must still call restoreState to use this object.
*/
- void reattachToOperationContext(OperationContext* opCtx);
+ virtual void reattachToOperationContext(OperationContext* opCtx) = 0;
/**
* Same as restoreState but without the logic to retry if a WriteConflictException is
@@ -329,7 +344,7 @@ public:
*
* This is only public for PlanYieldPolicy. DO NOT CALL ANYWHERE ELSE.
*/
- Status restoreStateWithoutRetrying();
+ virtual Status restoreStateWithoutRetrying() = 0;
//
// Running Support
@@ -344,9 +359,9 @@ public:
*
* If a YIELD_AUTO policy is set, then this method may yield.
*/
- ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
+ virtual ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) = 0;
- ExecState getNext(BSONObj* objOut, RecordId* dlOut);
+ virtual ExecState getNext(BSONObj* objOut, RecordId* dlOut) = 0;
/**
* Returns 'true' if the plan is done producing results (or writing), 'false' otherwise.
@@ -354,7 +369,7 @@ public:
* Tailable cursors are a possible exception to this: they may have further results even if
* isEOF() returns true.
*/
- bool isEOF();
+ virtual bool isEOF() = 0;
/**
* Execute the plan to completion, throwing out the results. Used when you want to work the
@@ -366,7 +381,7 @@ public:
* error occurs, it is illegal to subsequently access the collection, since it may have been
* dropped.
*/
- Status executePlan();
+ virtual Status executePlan() = 0;
//
// Concurrency-related methods.
@@ -380,7 +395,7 @@ public:
* method is called multiple times, only the first 'killStatus' will be retained. It is an error
* to call this method with Status::OK.
*/
- void markAsKilled(Status killStatus);
+ virtual void markAsKilled(Status killStatus) = 0;
/**
* Cleans up any state associated with this PlanExecutor. Must be called before deleting this
@@ -395,7 +410,7 @@ public:
* is the owner's responsibility to call dispose() with a valid OperationContext before
* deleting the PlanExecutor.
*/
- void dispose(OperationContext* opCtx, CursorManager* cursorManager);
+ virtual void dispose(OperationContext* opCtx, CursorManager* cursorManager) = 0;
/**
* Helper method to aid in displaying an ExecState for debug or other recreational purposes.
@@ -414,173 +429,39 @@ public:
* If used in combination with getNextSnapshotted(), then the SnapshotId associated with
* 'obj' will be null when 'obj' is dequeued.
*/
- void enqueue(const BSONObj& obj);
+ virtual void enqueue(const BSONObj& obj) = 0;
/**
* Helper method which returns a set of BSONObj, where each represents a sort order of our
* output.
*/
- BSONObjSet getOutputSorts() const;
+ virtual BSONObjSet getOutputSorts() const = 0;
/**
* Communicate to this PlanExecutor that it is no longer registered with the CursorManager as a
* 'non-cached PlanExecutor'.
*/
- void unsetRegistered() {
- _registrationToken.reset();
- }
-
- boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId>
- getRegistrationToken() const& {
- return _registrationToken;
- }
+ virtual void unsetRegistered() = 0;
+ virtual RegistrationToken getRegistrationToken() const& = 0;
void getRegistrationToken() && = delete;
+ virtual void setRegistrationToken(RegistrationToken token) & = 0;
- void setRegistrationToken(
- Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId token) & {
- invariant(!_registrationToken);
- _registrationToken = token;
- }
-
- bool isMarkedAsKilled() const {
- return !_killStatus.isOK();
- }
-
- Status getKillStatus() {
- invariant(isMarkedAsKilled());
- return _killStatus;
- }
+ virtual bool isMarkedAsKilled() const = 0;
+ virtual Status getKillStatus() = 0;
- bool isDisposed() const {
- return _currentState == kDisposed;
- }
-
- bool isDetached() const {
- return _currentState == kDetached;
- }
+ virtual bool isDisposed() const = 0;
+ virtual bool isDetached() const = 0;
/**
* If the last oplog timestamp is being tracked for this PlanExecutor, return it.
* Otherwise return a null timestamp.
*/
- Timestamp getLatestOplogTimestamp();
-
-private:
- /**
- * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called
- * on a tailable and awaitData cursor that still has time left and hasn't been interrupted.
- */
- bool shouldListenForInserts();
-
- /**
- * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
- * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
- * should be returned immediately.
- */
- bool shouldWaitForInserts();
-
- /**
- * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
- * is not capable of yielding based on a notifier.
- */
- std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier();
-
- /**
- * Yields locks and waits for inserts to the collection. Returns ADVANCED if there has been an
- * insertion and there may be new results. Returns DEAD if the PlanExecutor was killed during a
- * yield. This method is only to be used for tailable and awaitData cursors, so rather than
- * returning DEAD if the operation has exceeded its time limit, we return IS_EOF to preserve
- * this PlanExecutor for future use.
- *
- * If an error is encountered and 'errorObj' is provided, it is populated with an object
- * describing the error.
- */
- ExecState waitForInserts(CappedInsertNotifierData* notifierData,
- Snapshotted<BSONObj>* errorObj);
-
- ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
+ virtual Timestamp getLatestOplogTimestamp() = 0;
/**
- * New PlanExecutor instances are created with the static make() methods above.
+ * Turns a BSONObj representing an error status produced by getNext() into a Status.
*/
- PlanExecutor(OperationContext* opCtx,
- std::unique_ptr<WorkingSet> ws,
- std::unique_ptr<PlanStage> rt,
- std::unique_ptr<QuerySolution> qs,
- std::unique_ptr<CanonicalQuery> cq,
- const Collection* collection,
- NamespaceString nss,
- YieldPolicy yieldPolicy);
-
- /**
- * A PlanExecutor must be disposed before destruction. In most cases, this will happen
- * automatically through a PlanExecutor::Deleter or a ClientCursor.
- */
- ~PlanExecutor();
-
- /**
- * Public factory methods delegate to this private factory to do their work.
- */
- static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
- OperationContext* opCtx,
- std::unique_ptr<WorkingSet> ws,
- std::unique_ptr<PlanStage> rt,
- std::unique_ptr<QuerySolution> qs,
- std::unique_ptr<CanonicalQuery> cq,
- const Collection* collection,
- NamespaceString nss,
- YieldPolicy yieldPolicy);
-
- /**
- * Clients of PlanExecutor expect that on receiving a new instance from one of the make()
- * factory methods, plan selection has already been completed. In order to enforce this
- * property, this function is called to do plan selection prior to returning the new
- * PlanExecutor.
- *
- * If the tree contains plan selection stages, such as MultiPlanStage or SubplanStage,
- * this calls into their underlying plan selection facilities. Otherwise, does nothing.
- *
- * If a YIELD_AUTO policy is set then locks are yielded during plan selection.
- *
- * Returns a non-OK status if query planning fails. In particular, this function returns
- * ErrorCodes::QueryPlanKilled if plan execution cannot proceed due to a concurrent write or
- * catalog operation.
- */
- Status pickBestPlan(const Collection* collection);
-
- // The OperationContext that we're executing within. This can be updated if necessary by using
- // detachFromOperationContext() and reattachToOperationContext().
- OperationContext* _opCtx;
-
- std::unique_ptr<CanonicalQuery> _cq;
- std::unique_ptr<WorkingSet> _workingSet;
- std::unique_ptr<QuerySolution> _qs;
- std::unique_ptr<PlanStage> _root;
-
- // If _killStatus has a non-OK value, then we have been killed and the value represents the
- // reason for the kill.
- Status _killStatus = Status::OK();
-
- // What namespace are we operating over?
- NamespaceString _nss;
-
- // This is used to handle automatic yielding when allowed by the YieldPolicy. Never NULL.
- // TODO make this a non-pointer member. This requires some header shuffling so that this
- // file includes plan_yield_policy.h rather than the other way around.
- const std::unique_ptr<PlanYieldPolicy> _yieldPolicy;
-
- // A stash of results generated by this plan that the user of the PlanExecutor didn't want
- // to consume yet. We empty the queue before retrieving further results from the plan
- // stages.
- std::queue<BSONObj> _stash;
-
- enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable;
-
- // Set if this PlanExecutor is registered with the CursorManager.
- boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId>
- _registrationToken;
-
- bool _everDetachedFromOperationContext = false;
+ virtual Status getMemberObjectStatus(const BSONObj& memberObj) const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index 111fa4b376f..037df73f515 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -32,7 +32,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/query/plan_executor.h"
+#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
@@ -132,7 +132,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
unique_ptr<PlanStage> rt,
const Collection* collection,
YieldPolicy yieldPolicy) {
- return PlanExecutor::make(
+ return PlanExecutorImpl::make(
opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, {}, yieldPolicy);
}
@@ -143,14 +143,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
unique_ptr<PlanStage> rt,
NamespaceString nss,
YieldPolicy yieldPolicy) {
- return PlanExecutor::make(opCtx,
- std::move(ws),
- std::move(rt),
- nullptr,
- nullptr,
- nullptr,
- std::move(nss),
- yieldPolicy);
+ return PlanExecutorImpl::make(opCtx,
+ std::move(ws),
+ std::move(rt),
+ nullptr,
+ nullptr,
+ nullptr,
+ std::move(nss),
+ yieldPolicy);
}
// static
@@ -161,7 +161,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
unique_ptr<CanonicalQuery> cq,
const Collection* collection,
YieldPolicy yieldPolicy) {
- return PlanExecutor::make(
+ return PlanExecutorImpl::make(
opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, {}, yieldPolicy);
}
@@ -174,18 +174,18 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
unique_ptr<CanonicalQuery> cq,
const Collection* collection,
YieldPolicy yieldPolicy) {
- return PlanExecutor::make(opCtx,
- std::move(ws),
- std::move(rt),
- std::move(qs),
- std::move(cq),
- collection,
- {},
- yieldPolicy);
+ return PlanExecutorImpl::make(opCtx,
+ std::move(ws),
+ std::move(rt),
+ std::move(qs),
+ std::move(cq),
+ collection,
+ {},
+ yieldPolicy);
}
// static
-StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
+StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutorImpl::make(
OperationContext* opCtx,
unique_ptr<WorkingSet> ws,
unique_ptr<PlanStage> rt,
@@ -195,19 +195,19 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
NamespaceString nss,
YieldPolicy yieldPolicy) {
- std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec(
- new PlanExecutor(opCtx,
- std::move(ws),
- std::move(rt),
- std::move(qs),
- std::move(cq),
- collection,
- std::move(nss),
- yieldPolicy),
- PlanExecutor::Deleter(opCtx, collection));
+ auto execImpl = new PlanExecutorImpl(opCtx,
+ std::move(ws),
+ std::move(rt),
+ std::move(qs),
+ std::move(cq),
+ collection,
+ std::move(nss),
+ yieldPolicy);
+ PlanExecutor::Deleter planDeleter(opCtx, collection ? collection->getCursorManager() : nullptr);
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec(execImpl, std::move(planDeleter));
// Perform plan selection, if necessary.
- Status status = exec->pickBestPlan(collection);
+ Status status = execImpl->_pickBestPlan();
if (!status.isOK()) {
return status;
}
@@ -215,14 +215,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
return std::move(exec);
}
-PlanExecutor::PlanExecutor(OperationContext* opCtx,
- unique_ptr<WorkingSet> ws,
- unique_ptr<PlanStage> rt,
- unique_ptr<QuerySolution> qs,
- unique_ptr<CanonicalQuery> cq,
- const Collection* collection,
- NamespaceString nss,
- YieldPolicy yieldPolicy)
+PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx,
+ unique_ptr<WorkingSet> ws,
+ unique_ptr<PlanStage> rt,
+ unique_ptr<QuerySolution> qs,
+ unique_ptr<CanonicalQuery> cq,
+ const Collection* collection,
+ NamespaceString nss,
+ YieldPolicy yieldPolicy)
: _opCtx(opCtx),
_cq(std::move(cq)),
_workingSet(std::move(ws)),
@@ -247,7 +247,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx,
}
}
-Status PlanExecutor::pickBestPlan(const Collection* collection) {
+Status PlanExecutorImpl::_pickBestPlan() {
invariant(_currentState == kUsable);
// First check if we need to do subplanning.
@@ -278,7 +278,7 @@ Status PlanExecutor::pickBestPlan(const Collection* collection) {
return Status::OK();
}
-PlanExecutor::~PlanExecutor() {
+PlanExecutorImpl::~PlanExecutorImpl() {
invariant(_currentState == kDisposed);
}
@@ -296,23 +296,23 @@ string PlanExecutor::statestr(ExecState s) {
}
}
-WorkingSet* PlanExecutor::getWorkingSet() const {
+WorkingSet* PlanExecutorImpl::getWorkingSet() const {
return _workingSet.get();
}
-PlanStage* PlanExecutor::getRootStage() const {
+PlanStage* PlanExecutorImpl::getRootStage() const {
return _root.get();
}
-CanonicalQuery* PlanExecutor::getCanonicalQuery() const {
+CanonicalQuery* PlanExecutorImpl::getCanonicalQuery() const {
return _cq.get();
}
-unique_ptr<PlanStageStats> PlanExecutor::getStats() const {
- return _root->getStats();
+const NamespaceString& PlanExecutorImpl::nss() const {
+ return _nss;
}
-BSONObjSet PlanExecutor::getOutputSorts() const {
+BSONObjSet PlanExecutorImpl::getOutputSorts() const {
if (_qs && _qs->root) {
_qs->root->computeProperties();
return _qs->root->getSort();
@@ -337,11 +337,11 @@ BSONObjSet PlanExecutor::getOutputSorts() const {
return SimpleBSONObjComparator::kInstance.makeBSONObjSet();
}
-OperationContext* PlanExecutor::getOpCtx() const {
+OperationContext* PlanExecutorImpl::getOpCtx() const {
return _opCtx;
}
-void PlanExecutor::saveState() {
+void PlanExecutorImpl::saveState() {
invariant(_currentState == kUsable || _currentState == kSaved);
// The query stages inside this stage tree might buffer record ids (e.g. text, geoNear,
@@ -355,7 +355,7 @@ void PlanExecutor::saveState() {
_currentState = kSaved;
}
-Status PlanExecutor::restoreState() {
+Status PlanExecutorImpl::restoreState() {
try {
return restoreStateWithoutRetrying();
} catch (const WriteConflictException&) {
@@ -367,7 +367,7 @@ Status PlanExecutor::restoreState() {
}
}
-Status PlanExecutor::restoreStateWithoutRetrying() {
+Status PlanExecutorImpl::restoreStateWithoutRetrying() {
invariant(_currentState == kSaved);
if (!isMarkedAsKilled()) {
@@ -378,7 +378,7 @@ Status PlanExecutor::restoreStateWithoutRetrying() {
return _killStatus;
}
-void PlanExecutor::detachFromOperationContext() {
+void PlanExecutorImpl::detachFromOperationContext() {
invariant(_currentState == kSaved);
_opCtx = nullptr;
_root->detachFromOperationContext();
@@ -386,7 +386,7 @@ void PlanExecutor::detachFromOperationContext() {
_everDetachedFromOperationContext = true;
}
-void PlanExecutor::reattachToOperationContext(OperationContext* opCtx) {
+void PlanExecutorImpl::reattachToOperationContext(OperationContext* opCtx) {
invariant(_currentState == kDetached);
// We're reattaching for a getMore now. Reset the yield timer in order to prevent from
@@ -398,9 +398,9 @@ void PlanExecutor::reattachToOperationContext(OperationContext* opCtx) {
_currentState = kSaved;
}
-PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) {
+PlanExecutor::ExecState PlanExecutorImpl::getNext(BSONObj* objOut, RecordId* dlOut) {
Snapshotted<BSONObj> snapshotted;
- ExecState state = getNextImpl(objOut ? &snapshotted : NULL, dlOut);
+ ExecState state = _getNextImpl(objOut ? &snapshotted : NULL, dlOut);
if (objOut) {
*objOut = snapshotted.value();
@@ -409,24 +409,24 @@ PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut)
return state;
}
-PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* objOut,
- RecordId* dlOut) {
+PlanExecutor::ExecState PlanExecutorImpl::getNextSnapshotted(Snapshotted<BSONObj>* objOut,
+ RecordId* dlOut) {
// Detaching from the OperationContext means that the returned snapshot ids could be invalid.
invariant(!_everDetachedFromOperationContext);
- return getNextImpl(objOut, dlOut);
+ return _getNextImpl(objOut, dlOut);
}
-bool PlanExecutor::shouldListenForInserts() {
+bool PlanExecutorImpl::_shouldListenForInserts() {
return _cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
awaitDataState(_opCtx).waitForInsertsDeadline >
_opCtx->getServiceContext()->getPreciseClockSource()->now();
}
-bool PlanExecutor::shouldWaitForInserts() {
+bool PlanExecutorImpl::_shouldWaitForInserts() {
// If this is an awaitData-respecting operation and we have time left and we're not interrupted,
// we should wait for inserts.
- if (shouldListenForInserts()) {
+ if (_shouldListenForInserts()) {
// We expect awaitData cursors to be yielding.
invariant(_yieldPolicy->canReleaseLocksDuringExecution());
@@ -443,7 +443,7 @@ bool PlanExecutor::shouldWaitForInserts() {
return false;
}
-std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() {
+std::shared_ptr<CappedInsertNotifier> PlanExecutorImpl::_getCappedInsertNotifier() {
// We don't expect to need a capped insert notifier for non-yielding plans.
invariant(_yieldPolicy->canReleaseLocksDuringExecution());
@@ -458,8 +458,8 @@ std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() {
return collection->getCappedInsertNotifier();
}
-PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData,
- Snapshotted<BSONObj>* errorObj) {
+PlanExecutor::ExecState PlanExecutorImpl::_waitForInserts(CappedInsertNotifierData* notifierData,
+ Snapshotted<BSONObj>* errorObj) {
invariant(notifierData->notifier);
// The notifier wait() method will not wait unless the version passed to it matches the
@@ -490,7 +490,8 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n
return DEAD;
}
-PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
+PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<BSONObj>* objOut,
+ RecordId* dlOut) {
if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) {
Status status(ErrorCodes::InternalError,
str::stream() << "PlanExecutor hit planExecutorAlwaysFails fail point");
@@ -523,9 +524,9 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
// insert notifier the entire time we are in the loop. Holding a shared pointer to the capped
// insert notifier is necessary for the notifierVersion to advance.
CappedInsertNotifierData cappedInsertNotifierData;
- if (shouldListenForInserts()) {
+ if (_shouldListenForInserts()) {
// We always construct the CappedInsertNotifier for awaitData cursors.
- cappedInsertNotifierData.notifier = getCappedInsertNotifier();
+ cappedInsertNotifierData.notifier = _getCappedInsertNotifier();
}
for (;;) {
// These are the conditions which can cause us to yield:
@@ -609,10 +610,10 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
"enabled. Blocking until fail point is disabled.";
MONGO_FAIL_POINT_PAUSE_WHILE_SET(planExecutorHangBeforeShouldWaitForInserts);
}
- if (!shouldWaitForInserts()) {
+ if (!_shouldWaitForInserts()) {
return PlanExecutor::IS_EOF;
}
- const ExecState waitResult = waitForInserts(&cappedInsertNotifierData, objOut);
+ const ExecState waitResult = _waitForInserts(&cappedInsertNotifierData, objOut);
if (waitResult == PlanExecutor::ADVANCED) {
// There may be more results, keep going.
continue;
@@ -633,12 +634,12 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
}
}
-bool PlanExecutor::isEOF() {
+bool PlanExecutorImpl::isEOF() {
invariant(_currentState == kUsable);
return isMarkedAsKilled() || (_stash.empty() && _root->isEOF());
}
-void PlanExecutor::markAsKilled(Status killStatus) {
+void PlanExecutorImpl::markAsKilled(Status killStatus) {
invariant(!killStatus.isOK());
// If killed multiple times, only retain the first status.
if (_killStatus.isOK()) {
@@ -646,7 +647,7 @@ void PlanExecutor::markAsKilled(Status killStatus) {
}
}
-void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager) {
+void PlanExecutorImpl::dispose(OperationContext* opCtx, CursorManager* cursorManager) {
if (_currentState == kDisposed) {
return;
}
@@ -664,7 +665,7 @@ void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager
_currentState = kDisposed;
}
-Status PlanExecutor::executePlan() {
+Status PlanExecutorImpl::executePlan() {
invariant(_currentState == kUsable);
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
@@ -677,7 +678,7 @@ Status PlanExecutor::executePlan() {
return _killStatus;
}
- auto errorStatus = WorkingSetCommon::getMemberObjectStatus(obj);
+ auto errorStatus = getMemberObjectStatus(obj);
invariant(!errorStatus.isOK());
return errorStatus.withContext(str::stream() << "Exec error resulting in state "
<< PlanExecutor::statestr(state));
@@ -689,11 +690,41 @@ Status PlanExecutor::executePlan() {
}
-void PlanExecutor::enqueue(const BSONObj& obj) {
+void PlanExecutorImpl::enqueue(const BSONObj& obj) {
_stash.push(obj.getOwned());
}
-Timestamp PlanExecutor::getLatestOplogTimestamp() {
+void PlanExecutorImpl::unsetRegistered() {
+ _registrationToken.reset();
+}
+
+PlanExecutor::RegistrationToken PlanExecutorImpl::getRegistrationToken() const& {
+ return _registrationToken;
+}
+
+void PlanExecutorImpl::setRegistrationToken(RegistrationToken token)& {
+ invariant(!_registrationToken);
+ _registrationToken = token;
+}
+
+bool PlanExecutorImpl::isMarkedAsKilled() const {
+ return !_killStatus.isOK();
+}
+
+Status PlanExecutorImpl::getKillStatus() {
+ invariant(isMarkedAsKilled());
+ return _killStatus;
+}
+
+bool PlanExecutorImpl::isDisposed() const {
+ return _currentState == kDisposed;
+}
+
+bool PlanExecutorImpl::isDetached() const {
+ return _currentState == kDetached;
+}
+
+Timestamp PlanExecutorImpl::getLatestOplogTimestamp() {
if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();
if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN))
@@ -701,23 +732,8 @@ Timestamp PlanExecutor::getLatestOplogTimestamp() {
return Timestamp();
}
-//
-// PlanExecutor::Deleter
-//
-
-PlanExecutor::Deleter::Deleter(OperationContext* opCtx, const Collection* collection)
- : _opCtx(opCtx), _cursorManager(collection ? collection->getCursorManager() : nullptr) {}
-
-void PlanExecutor::Deleter::operator()(PlanExecutor* execPtr) {
- try {
- invariant(_opCtx); // It is illegal to invoke operator() on a default constructed Deleter.
- if (!_dismissed) {
- execPtr->dispose(_opCtx, _cursorManager);
- }
- delete execPtr;
- } catch (...) {
- std::terminate();
- }
+Status PlanExecutorImpl::getMemberObjectStatus(const BSONObj& memberObj) const {
+ return WorkingSetCommon::getMemberObjectStatus(memberObj);
}
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h
new file mode 100644
index 00000000000..6447d014cdc
--- /dev/null
+++ b/src/mongo/db/query/plan_executor_impl.h
@@ -0,0 +1,189 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <queue>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/query/plan_executor.h"
+
+namespace mongo {
+
+class PlanExecutorImpl : public PlanExecutor {
+ MONGO_DISALLOW_COPYING(PlanExecutorImpl);
+
+public:
+ /**
+ * Public factory methods delegate to this impl factory to do their work.
+ */
+ static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
+ OperationContext* opCtx,
+ std::unique_ptr<WorkingSet> ws,
+ std::unique_ptr<PlanStage> rt,
+ std::unique_ptr<QuerySolution> qs,
+ std::unique_ptr<CanonicalQuery> cq,
+ const Collection* collection,
+ NamespaceString nss,
+ YieldPolicy yieldPolicy);
+
+ virtual ~PlanExecutorImpl();
+ WorkingSet* getWorkingSet() const final;
+ PlanStage* getRootStage() const final;
+ CanonicalQuery* getCanonicalQuery() const final;
+ const NamespaceString& nss() const final;
+ OperationContext* getOpCtx() const final;
+ void saveState() final;
+ Status restoreState() final;
+ void detachFromOperationContext() final;
+ void reattachToOperationContext(OperationContext* opCtx) final;
+ Status restoreStateWithoutRetrying() final;
+ ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) final;
+ ExecState getNext(BSONObj* objOut, RecordId* dlOut) final;
+ bool isEOF() final;
+ Status executePlan() final;
+ void markAsKilled(Status killStatus) final;
+ void dispose(OperationContext* opCtx, CursorManager* cursorManager) final;
+ void enqueue(const BSONObj& obj) final;
+ BSONObjSet getOutputSorts() const final;
+ void unsetRegistered() final;
+ RegistrationToken getRegistrationToken() const&;
+ void setRegistrationToken(RegistrationToken token) & final;
+ bool isMarkedAsKilled() const final;
+ Status getKillStatus() final;
+ bool isDisposed() const final;
+ bool isDetached() const final;
+ Timestamp getLatestOplogTimestamp() final;
+ Status getMemberObjectStatus(const BSONObj& memberObj) const final;
+
+private:
+ /**
+ * New PlanExecutor instances are created with the static make() method above.
+ */
+ PlanExecutorImpl(OperationContext* opCtx,
+ std::unique_ptr<WorkingSet> ws,
+ std::unique_ptr<PlanStage> rt,
+ std::unique_ptr<QuerySolution> qs,
+ std::unique_ptr<CanonicalQuery> cq,
+ const Collection* collection,
+ NamespaceString nss,
+ YieldPolicy yieldPolicy);
+
+ /**
+ * Clients of PlanExecutor expect that on receiving a new instance from one of the make()
+ * factory methods, plan selection has already been completed. In order to enforce this
+ * property, this function is called to do plan selection prior to returning the new
+ * PlanExecutor.
+ *
+ * If the tree contains plan selection stages, such as MultiPlanStage or SubplanStage,
+ * this calls into their underlying plan selection facilities. Otherwise, does nothing.
+ *
+ * If a YIELD_AUTO policy is set then locks are yielded during plan selection.
+ *
+ * Returns a non-OK status if query planning fails. In particular, this function returns
+ * ErrorCodes::QueryPlanKilled if plan execution cannot proceed due to a concurrent write or
+ * catalog operation.
+ */
+ Status _pickBestPlan();
+
+ /**
+ * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called
+ * on a tailable and awaitData cursor that still has time left and hasn't been interrupted.
+ */
+ bool _shouldListenForInserts();
+
+ /**
+ * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
+ * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
+ * should be returned immediately.
+ */
+ bool _shouldWaitForInserts();
+
+ /**
+ * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
+ * is not capable of yielding based on a notifier.
+ */
+ std::shared_ptr<CappedInsertNotifier> _getCappedInsertNotifier();
+
+ /**
+ * Yields locks and waits for inserts to the collection. Returns ADVANCED if there has been an
+ * insertion and there may be new results. Returns DEAD if the PlanExecutor was killed during a
+ * yield. This method is only to be used for tailable and awaitData cursors, so rather than
+ * returning DEAD if the operation has exceeded its time limit, we return IS_EOF to preserve
+ * this PlanExecutor for future use.
+ *
+ * If an error is encountered and 'errorObj' is provided, it is populated with an object
+ * describing the error.
+ */
+ ExecState _waitForInserts(CappedInsertNotifierData* notifierData,
+ Snapshotted<BSONObj>* errorObj);
+
+ /**
+ * Common implementation for getNext() and getNextSnapshotted().
+ */
+ ExecState _getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
+
+ // The OperationContext that we're executing within. This can be updated if necessary by using
+ // detachFromOperationContext() and reattachToOperationContext().
+ OperationContext* _opCtx;
+
+ std::unique_ptr<CanonicalQuery> _cq;
+ std::unique_ptr<WorkingSet> _workingSet;
+ std::unique_ptr<QuerySolution> _qs;
+ std::unique_ptr<PlanStage> _root;
+
+ // If _killStatus has a non-OK value, then we have been killed and the value represents the
+ // reason for the kill.
+ Status _killStatus = Status::OK();
+
+ // What namespace are we operating over?
+ NamespaceString _nss;
+
+ // This is used to handle automatic yielding when allowed by the YieldPolicy. Never NULL.
+ // TODO make this a non-pointer member. This requires some header shuffling so that this
+ // file includes plan_yield_policy.h rather than the other way around.
+ const std::unique_ptr<PlanYieldPolicy> _yieldPolicy;
+
+ // A stash of results generated by this plan that the user of the PlanExecutor didn't want
+ // to consume yet. We empty the queue before retrieving further results from the plan
+ // stages.
+ std::queue<BSONObj> _stash;
+
+ enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable;
+
+ // Set if this PlanExecutor is registered with the CursorManager.
+ boost::optional<Partitioned<stdx::unordered_set<PlanExecutor*>>::PartitionId>
+ _registrationToken;
+
+ bool _everDetachedFromOperationContext = false;
+};
+
+} // namespace mongo