diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-07-28 17:17:51 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-08-28 11:24:48 -0400 |
commit | 55a85da4980f1967f88bbccbd43646ee89c6301f (patch) | |
tree | d0911d9ca87de609e2a3d4d5391ec0752a472f5f | |
parent | 6e2cc35d6d4370804f09665b243d1e4d5d418ec0 (diff) | |
download | mongo-55a85da4980f1967f88bbccbd43646ee89c6301f.tar.gz |
SERVER-30410 Ensure executor is saved after tailable cursor time out.
25 files changed, 1718 insertions, 1304 deletions
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index deb64fdbca6..f70dd9649fc 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -236,7 +236,10 @@ mongo::Status mongo::cloneCollectionAsCapped(OperationContext* opCtx, // abandonSnapshot. exec->saveState(); opCtx->recoveryUnit()->abandonSnapshot(); - exec->restoreState(); // Handles any WCEs internally. + auto restoreStatus = exec->restoreState(); // Handles any WCEs internally. + if (!restoreStatus.isOK()) { + return restoreStatus; + } } } diff --git a/src/mongo/db/catalog/index_create_impl.cpp b/src/mongo/db/catalog/index_create_impl.cpp index 61649dc3f33..95b7a90f358 100644 --- a/src/mongo/db/catalog/index_create_impl.cpp +++ b/src/mongo/db/catalog/index_create_impl.cpp @@ -350,8 +350,12 @@ Status MultiIndexBlockImpl::insertAllDocumentsInCollection(std::set<RecordId>* d // Fail the index build hard. return ret; } - if (_buildInBackground) - exec->restoreState(); // Handles any WCEs internally. + if (_buildInBackground) { + auto restoreStatus = exec->restoreState(); // Handles any WCEs internally. + if (!restoreStatus.isOK()) { + return restoreStatus; + } + } // Go to the next document progress->hit(); @@ -366,7 +370,10 @@ Status MultiIndexBlockImpl::insertAllDocumentsInCollection(std::set<RecordId>* d // abandonSnapshot. exec->saveState(); _opCtx->recoveryUnit()->abandonSnapshot(); - exec->restoreState(); // Handles any WCEs internally. + auto restoreStatus = exec->restoreState(); // Handles any WCEs internally. + if (!restoreStatus.isOK()) { + return restoreStatus; + } } } diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index f5cc3803928..4036b6e0fa1 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -751,7 +751,7 @@ public: } // Have the lock again. See if we were killed. - if (!exec->restoreState()) { + if (!exec->restoreState().isOK()) { if (!partialOk) { uasserted(13281, "File deleted during filemd5 command"); } diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index d3b2ca63078..7b78ba64fff 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -267,11 +267,11 @@ public: } // Validation related to awaitData. - if (isCursorAwaitData(cursor)) { - invariant(isCursorTailable(cursor)); + if (cursor->isAwaitData()) { + invariant(cursor->isTailable()); } - if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) { + if (request.awaitDataTimeout && !cursor->isAwaitData()) { Status status(ErrorCodes::BadValue, "cannot set maxTimeMS on getMore command for a non-awaitData cursor"); return appendCommandStatus(result, status); @@ -294,7 +294,7 @@ public: // awaitData, then we supply a default time of one second. Otherwise we roll over // any leftover time from the maxTimeMS of the operation that spawned this cursor, // applying it to this getMore. - if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) { + if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { opCtx->setDeadlineAfterNowBy(Seconds{1}); } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); @@ -304,7 +304,7 @@ public: PlanExecutor* exec = cursor->getExecutor(); exec->reattachToOperationContext(opCtx); - exec->restoreState(); + uassertStatusOK(exec->restoreState()); auto planSummary = Explain::getPlanSummary(exec); { @@ -335,7 +335,7 @@ public: Explain::getSummaryStats(*exec, &preExecutionStats); // Mark this as an AwaitData operation if appropriate. - if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) { + if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { if (request.lastKnownCommittedOpTime) clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); shouldWaitForInserts(opCtx) = true; @@ -362,7 +362,7 @@ public: curOp->debug().execStats = execStatsBob.obj(); } - if (shouldSaveCursorGetMore(state, exec, isCursorTailable(cursor))) { + if (shouldSaveCursorGetMore(state, exec, cursor->isTailable())) { respondWithId = request.cursorid; exec->saveState(); @@ -425,7 +425,6 @@ public: PlanExecutor::ExecState* state, long long* numResults) { PlanExecutor* exec = cursor->getExecutor(); - const bool isAwaitData = isCursorAwaitData(cursor); // If an awaitData getMore is killed during this process due to our max time expiring at // an interrupt point, we just continue as normal and return rather than reporting a @@ -451,13 +450,6 @@ public: // FAILURE state will make getMore command close the cursor even if it's tailable. *state = PlanExecutor::FAILURE; return Status::OK(); - } catch (const AssertionException& except) { - if (isAwaitData && except.code() == ErrorCodes::ExceededTimeLimit) { - // We ignore exceptions from interrupt points due to max time expiry for - // awaitData cursors. - } else { - throw; - } } if (PlanExecutor::FAILURE == *state) { diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 3cfbad4b4c7..bfcef8aa93f 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1149,11 +1149,8 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold prev = o; all.push_back(o); - if (!exec->restoreState()) { - uasserted(34375, "Plan executor killed during mapReduce final reduce"); - } - _opCtx->checkForInterrupt(); + uassertStatusOK(exec->restoreState()); } uassert(34428, @@ -1540,12 +1537,9 @@ public: scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S)); - if (!exec->restoreState()) { - return appendCommandStatus( - result, - Status(ErrorCodes::OperationFailed, - str::stream() - << "Executor killed during mapReduce command")); + auto restoreStatus = exec->restoreState(); + if (!restoreStatus.isOK()) { + return appendCommandStatus(result, restoreStatus); } reduceTime += t.micros(); diff --git a/src/mongo/db/exec/cached_plan.cpp b/src/mongo/db/exec/cached_plan.cpp index 42780395037..55a391b57c3 100644 --- a/src/mongo/db/exec/cached_plan.cpp +++ b/src/mongo/db/exec/cached_plan.cpp @@ -181,12 +181,7 @@ Status CachedPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) { // In all cases, the actual yielding happens here. if (yieldPolicy->shouldYield()) { // Here's where we yield. - bool alive = yieldPolicy->yield(_fetcher.get()); - - if (!alive) { - return Status(ErrorCodes::QueryPlanKilled, - "CachedPlanStage killed during plan selection"); - } + return yieldPolicy->yield(_fetcher.get()); } // We're done using the fetcher, so it should be freed. We don't want to diff --git a/src/mongo/db/exec/cached_plan.h b/src/mongo/db/exec/cached_plan.h index 008fc196491..658312db4a0 100644 --- a/src/mongo/db/exec/cached_plan.h +++ b/src/mongo/db/exec/cached_plan.h @@ -111,7 +111,8 @@ private: * May yield during the cached plan stage's trial period or replanning phases. * * Returns a non-OK status if query planning fails. In particular, this function returns - * ErrorCodes::QueryPlanKilled if the query plan was killed during a yield. + * ErrorCodes::QueryPlanKilled if the query plan was killed during a yield, or + * ErrorCodes::ExceededTimeLimit if the operation exceeded its time limit. */ Status tryYield(PlanYieldPolicy* yieldPolicy); diff --git a/src/mongo/db/exec/multi_plan.cpp b/src/mongo/db/exec/multi_plan.cpp index b113a17cc30..4b32b06ade4 100644 --- a/src/mongo/db/exec/multi_plan.cpp +++ b/src/mongo/db/exec/multi_plan.cpp @@ -150,14 +150,13 @@ Status MultiPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) { // 3) we need to yield and retry due to a WriteConflictException. // In all cases, the actual yielding happens here. if (yieldPolicy->shouldYield()) { - bool alive = yieldPolicy->yield(_fetcher.get()); + auto yieldStatus = yieldPolicy->yield(_fetcher.get()); - if (!alive) { + if (!yieldStatus.isOK()) { _failure = true; - Status failStat(ErrorCodes::QueryPlanKilled, - "PlanExecutor killed during plan selection"); - _statusMemberId = WorkingSetCommon::allocateStatusMember(_candidates[0].ws, failStat); - return failStat; + _statusMemberId = + WorkingSetCommon::allocateStatusMember(_candidates[0].ws, yieldStatus); + return yieldStatus; } } diff --git a/src/mongo/db/exec/multi_plan.h b/src/mongo/db/exec/multi_plan.h index 258e54642ba..7ec6bad81d7 100644 --- a/src/mongo/db/exec/multi_plan.h +++ b/src/mongo/db/exec/multi_plan.h @@ -172,7 +172,7 @@ private: * Checks whether we need to perform either a timing-based yield or a yield for a document * fetch. If so, then uses 'yieldPolicy' to actually perform the yield. * - * Returns a non-OK status if killed during a yield. + * Returns a non-OK status if killed during a yield or if the query has exceeded its time limit. */ Status tryYield(PlanYieldPolicy* yieldPolicy); diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp index ca8281aeb6e..021bf3efd5a 100644 --- a/src/mongo/db/exec/subplan.cpp +++ b/src/mongo/db/exec/subplan.cpp @@ -503,9 +503,11 @@ Status SubplanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { // Plan each branch of the $or. Status subplanningStatus = planSubqueries(); if (!subplanningStatus.isOK()) { - if (subplanningStatus == ErrorCodes::QueryPlanKilled) { + if (subplanningStatus == ErrorCodes::QueryPlanKilled || + subplanningStatus == ErrorCodes::ExceededTimeLimit) { // Query planning cannot continue if the plan for one of the subqueries was killed - // because the collection or a candidate index may have been dropped. + // because the collection or a candidate index may have been dropped, or if we've + // exceeded the operation's time limit. return subplanningStatus; } return choosePlanWholeQuery(yieldPolicy); @@ -515,9 +517,11 @@ Status SubplanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { // the overall winning plan from the resulting index tags. Status subplanSelectStat = choosePlanForSubqueries(yieldPolicy); if (!subplanSelectStat.isOK()) { - if (subplanSelectStat == ErrorCodes::QueryPlanKilled) { - // Query planning cannot continue if the plan was killed because the collection or a - // candidate index may have been dropped. + if (subplanSelectStat == ErrorCodes::QueryPlanKilled || + subplanSelectStat == ErrorCodes::ExceededTimeLimit) { + // Query planning cannot continue if the plan for one of the subqueries was killed + // because the collection or a candidate index may have been dropped, or if we've + // exceeded the operation's time limit. return subplanSelectStat; } return choosePlanWholeQuery(yieldPolicy); diff --git a/src/mongo/db/exec/subplan.h b/src/mongo/db/exec/subplan.h index d844e27472c..315e79b93ca 100644 --- a/src/mongo/db/exec/subplan.h +++ b/src/mongo/db/exec/subplan.h @@ -102,7 +102,8 @@ public: * take place. * * Returns a non-OK status if query planning fails. In particular, this function returns - * ErrorCodes::QueryPlanKilled if the query plan was killed during a yield. + * ErrorCodes::QueryPlanKilled if the query plan was killed during a yield, or + * ErrorCodes::ExceededTimeLimit if the operation has exceeded its time limit. */ Status pickBestPlan(PlanYieldPolicy* yieldPolicy); diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index f15bafd5b8b..e31751e3a5c 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -74,7 +74,7 @@ void DocumentSourceCursor::loadBatch() { BSONObj resultObj; { AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); - _exec->restoreState(); + uassertStatusOK(_exec->restoreState()); int memUsageBytes = 0; { @@ -180,7 +180,7 @@ Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> BSONObjBuilder explainBuilder; { AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); - _exec->restoreState(); + uassertStatusOK(_exec->restoreState()); Explain::explainStages(_exec.get(), autoColl.getCollection(), *explain, &explainBuilder); _exec->saveState(); } diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index ca7fe029325..c94d393a462 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -75,14 +75,6 @@ using stdx::make_unique; // Failpoint for checking whether we've received a getmore. MONGO_FP_DECLARE(failReceivedGetmore); -bool isCursorTailable(const ClientCursor* cursor) { - return cursor->queryOptions() & QueryOption_CursorTailable; -} - -bool isCursorAwaitData(const ClientCursor* cursor) { - return cursor->queryOptions() & QueryOption_AwaitData; -} - bool shouldSaveCursor(OperationContext* opCtx, const Collection* collection, PlanExecutor::ExecState finalState, @@ -367,7 +359,7 @@ Message getMore(OperationContext* opCtx, uassert(40548, "OP_GET_MORE operations are not supported on tailable aggregations. Only clients " "which support the getMore command can be used on tailable aggregations.", - readLock || !isCursorAwaitData(cc)); + readLock || !cc->isAwaitData()); // If the operation that spawned this cursor had a time limit set, apply leftover // time to this getmore. @@ -389,7 +381,7 @@ Message getMore(OperationContext* opCtx, uint64_t notifierVersion = 0; std::shared_ptr<CappedInsertNotifier> notifier; - if (isCursorAwaitData(cc)) { + if (cc->isAwaitData()) { invariant(readLock->getCollection()->isCapped()); // Retrieve the notifier which we will wait on until new data arrives. We make sure // to do this in the lock because once we drop the lock it is possible for the @@ -404,7 +396,7 @@ Message getMore(OperationContext* opCtx, PlanExecutor* exec = cc->getExecutor(); exec->reattachToOperationContext(opCtx); - exec->restoreState(); + uassertStatusOK(exec->restoreState()); auto planSummary = Explain::getPlanSummary(exec); { @@ -430,7 +422,7 @@ Message getMore(OperationContext* opCtx, // If this is an await data cursor, and we hit EOF without generating any results, then // we block waiting for new data to arrive. - if (isCursorAwaitData(cc) && state == PlanExecutor::IS_EOF && numResults == 0) { + if (cc->isAwaitData() && state == PlanExecutor::IS_EOF && numResults == 0) { // Save the PlanExecutor and drop our locks. exec->saveState(); readLock.reset(); @@ -445,7 +437,7 @@ Message getMore(OperationContext* opCtx, // Reacquiring locks. readLock.emplace(opCtx, nss); - exec->restoreState(); + uassertStatusOK(exec->restoreState()); // We woke up because either the timed_wait expired, or there was more data. Either // way, attempt to generate another batch of results. @@ -474,7 +466,7 @@ Message getMore(OperationContext* opCtx, // case, the pin's destructor will be invoked, which will call release() on the pin. // Because our ClientCursorPin is declared after our lock is declared, this will happen // under the lock if any locking was necessary. - if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) { + if (!shouldSaveCursorGetMore(state, exec, cc->isTailable())) { ccPin.getValue().deleteUnderlying(); // cc is now invalid, as is the executor diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h index 2795934ec87..42a0da92380 100644 --- a/src/mongo/db/query/find.h +++ b/src/mongo/db/query/find.h @@ -42,16 +42,6 @@ class NamespaceString; class OperationContext; /** - * Whether or not the ClientCursor* is tailable. - */ -bool isCursorTailable(const ClientCursor* cursor); - -/** - * Whether or not the ClientCursor* has the awaitData flag set. - */ -bool isCursorAwaitData(const ClientCursor* cursor); - -/** * Returns true if we should keep a cursor around because we're expecting to return more query * results. * diff --git a/src/mongo/db/query/mock_yield_policies.h b/src/mongo/db/query/mock_yield_policies.h new file mode 100644 index 00000000000..2a690853799 --- /dev/null +++ b/src/mongo/db/query/mock_yield_policies.h @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/error_codes.h" +#include "mongo/db/query/plan_yield_policy.h" + +namespace mongo { + +/** + * A custom yield policy that always reports the plan should yield, and always returns + * ErrorCodes::ExceededTimeLimit from yield(). + */ +class AlwaysTimeOutYieldPolicy : public PlanYieldPolicy { +public: + AlwaysTimeOutYieldPolicy(PlanExecutor* exec) + : PlanYieldPolicy(exec, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT) {} + + AlwaysTimeOutYieldPolicy(ClockSource* cs) + : PlanYieldPolicy(PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, cs) {} + + bool shouldYield() override { + return true; + } + + Status yield(RecordFetcher* recordFetcher) override { + return {ErrorCodes::ExceededTimeLimit, "Using AlwaysTimeOutYieldPolicy"}; + } + + Status yield(stdx::function<void()> beforeYieldingFn, + stdx::function<void()> whileYieldingFn) override { + return {ErrorCodes::ExceededTimeLimit, "Using AlwaysTimeOutYieldPolicy"}; + } +}; + +/** + * A custom yield policy that always reports the plan should yield, and always returns + * ErrorCodes::QueryPlanKilled from yield(). + */ +class AlwaysPlanKilledYieldPolicy : public PlanYieldPolicy { +public: + AlwaysPlanKilledYieldPolicy(PlanExecutor* exec) + : PlanYieldPolicy(exec, PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED) {} + + AlwaysPlanKilledYieldPolicy(ClockSource* cs) + : PlanYieldPolicy(PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED, cs) {} + + bool shouldYield() override { + return true; + } + + Status yield(RecordFetcher* recordFetcher) override { + return {ErrorCodes::QueryPlanKilled, "Using AlwaysPlanKilledYieldPolicy"}; + } + + Status yield(stdx::function<void()> beforeYieldingFn, + stdx::function<void()> whileYieldingFn) override { + return {ErrorCodes::QueryPlanKilled, "Using AlwaysPlanKilledYieldPolicy"}; + } +}; + +} // namespace mongo diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 4fb1a8458b4..dbeb1d56b71 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -44,6 +44,7 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" @@ -72,9 +73,30 @@ struct CappedInsertNotifierData { namespace { -namespace { MONGO_FP_DECLARE(planExecutorAlwaysFails); -} // namespace + +/** + * Constructs a PlanYieldPolicy based on 'policy'. + */ +std::unique_ptr<PlanYieldPolicy> makeYieldPolicy(PlanExecutor* exec, + PlanExecutor::YieldPolicy policy) { + switch (policy) { + case PlanExecutor::YieldPolicy::YIELD_AUTO: + case PlanExecutor::YieldPolicy::YIELD_MANUAL: + case PlanExecutor::YieldPolicy::NO_YIELD: + case PlanExecutor::YieldPolicy::WRITE_CONFLICT_RETRY_ONLY: { + return stdx::make_unique<PlanYieldPolicy>(exec, policy); + } + case PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT: { + return stdx::make_unique<AlwaysTimeOutYieldPolicy>(exec); + } + case PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED: { + return stdx::make_unique<AlwaysPlanKilledYieldPolicy>(exec); + } + default: + MONGO_UNREACHABLE; + } +} /** * Retrieves the first stage of a given type from the plan tree, or NULL @@ -95,7 +117,7 @@ PlanStage* getStageByType(PlanStage* root, StageType type) { return NULL; } -} +} // namespace // static StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( @@ -202,7 +224,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, _root(std::move(rt)), _nss(std::move(nss)), // There's no point in yielding if the collection doesn't exist. - _yieldPolicy(new PlanYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) { + _yieldPolicy(makeYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) { // We may still need to initialize _nss from either collection or _cq. if (!_nss.isEmpty()) { return; // We already have an _nss set, so there's nothing more to do. @@ -327,7 +349,7 @@ void PlanExecutor::saveState() { _currentState = kSaved; } -bool PlanExecutor::restoreState() { +Status PlanExecutor::restoreState() { try { return restoreStateWithoutRetrying(); } catch (const WriteConflictException&) { @@ -339,7 +361,7 @@ bool PlanExecutor::restoreState() { } } -bool PlanExecutor::restoreStateWithoutRetrying() { +Status PlanExecutor::restoreStateWithoutRetrying() { invariant(_currentState == kSaved); if (!isMarkedAsKilled()) { @@ -347,7 +369,9 @@ bool PlanExecutor::restoreStateWithoutRetrying() { } _currentState = kUsable; - return !isMarkedAsKilled(); + return isMarkedAsKilled() + ? Status{ErrorCodes::QueryPlanKilled, "query killed during yield: " + *_killReason} + : Status::OK(); } void PlanExecutor::detachFromOperationContext() { @@ -401,6 +425,9 @@ bool PlanExecutor::shouldWaitForInserts() { if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData() && mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { + // We expect awaitData cursors to be yielding. + invariant(_yieldPolicy->canReleaseLocksDuringExecution()); + // For operations with a last committed opTime, we should not wait if the replication // coordinator's lastCommittedOpTime has changed. if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) { @@ -413,28 +440,23 @@ bool PlanExecutor::shouldWaitForInserts() { } std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() { - // If we cannot yield, we should retry immediately when we hit EOF, so do not get - // a CappedInsertNotifier. - if (!_yieldPolicy->canReleaseLocksDuringExecution()) - return nullptr; + // We don't expect to need a capped insert notifier for non-yielding plans. + invariant(_yieldPolicy->canReleaseLocksDuringExecution()); // We can only wait if we have a collection; otherwise we should retry immediately when // we hit EOF. dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); auto db = dbHolder().get(_opCtx, _nss.db()); - if (!db) - return nullptr; + invariant(db); auto collection = db->getCollection(_opCtx, _nss); - if (!collection) - return nullptr; + invariant(collection); return collection->getCappedInsertNotifier(); } -bool PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData) { - // We tested to see if we could wait when getting the CappedInsertNotifier. - if (!notifierData->notifier) - return true; +PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData, + Snapshotted<BSONObj>* errorObj) { + invariant(notifierData->notifier); // The notifier wait() method will not wait unless the version passed to it matches the // current version of the notifier. Since the version passed to it is the current version @@ -446,12 +468,16 @@ bool PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData) { ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); auto opCtx = _opCtx; uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); - bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { + auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { const auto timeout = opCtx->getRemainingMaxTimeMicros(); notifierData->notifier->wait(notifierData->lastEOFVersion, timeout); }); notifierData->lastEOFVersion = currentNotifierVersion; - return yieldResult; + if (yieldResult.isOK()) { + // There may be more results, try to get more data. + return ADVANCED; + } + return swallowTimeoutIfAwaitData(yieldResult, errorObj); } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { @@ -505,18 +531,9 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, // 3) we need to yield and retry due to a WriteConflictException. // In all cases, the actual yielding happens here. if (_yieldPolicy->shouldYield()) { - if (!_yieldPolicy->yield(fetcher.get())) { - // A return of false from a yield should only happen if we've been killed during the - // yield. - invariant(isMarkedAsKilled()); - - if (NULL != objOut) { - Status status(ErrorCodes::OperationFailed, - str::stream() << "Operation aborted because: " << *_killReason); - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); - } - return PlanExecutor::DEAD; + auto yieldStatus = _yieldPolicy->yield(fetcher.get()); + if (!yieldStatus.isOK()) { + return swallowTimeoutIfAwaitData(yieldStatus, objOut); } } @@ -589,23 +606,15 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { - if (shouldWaitForInserts()) { - const bool locksReacquiredAfterYield = waitForInserts(&cappedInsertNotifierData); - if (locksReacquiredAfterYield) { - // There may be more results, try to get more data. - continue; - } - invariant(isMarkedAsKilled()); - if (objOut) { - Status status(ErrorCodes::OperationFailed, - str::stream() << "Operation aborted because: " << *_killReason); - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); - } - return PlanExecutor::DEAD; - } else { + if (!shouldWaitForInserts()) { return PlanExecutor::IS_EOF; } + const ExecState waitResult = waitForInserts(&cappedInsertNotifierData, objOut); + if (waitResult == PlanExecutor::ADVANCED) { + // There may be more results, keep going. + continue; + } + return waitResult; } else { invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); @@ -677,6 +686,23 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } +PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( + Status yieldError, Snapshotted<BSONObj>* errorObj) const { + if (yieldError == ErrorCodes::ExceededTimeLimit) { + if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData()) { + // If the cursor is tailable then exceeding the time limit should not + // destroy this PlanExecutor, we should just stop waiting for inserts. + return PlanExecutor::IS_EOF; + } + } + + if (errorObj) { + *errorObj = Snapshotted<BSONObj>(SnapshotId(), + WorkingSetCommon::buildMemberStatusObject(yieldError)); + } + return PlanExecutor::DEAD; +} + // // PlanExecutor::Deleter // diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index c41a4b61106..743c818d65f 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -128,6 +128,16 @@ public: // locked during execution. For example, a PlanExecutor containing a PipelineProxyStage // which is being used to execute an aggregation pipeline. NO_YIELD, + + // Used for testing, this yield policy will cause the PlanExecutor to time out on the first + // yield, returning DEAD with an error object encoding a ErrorCodes::ExceededTimeLimit + // message. + ALWAYS_TIME_OUT, + + // Used for testing, this yield policy will cause the PlanExecutor to be marked as killed on + // the first yield, returning DEAD with an error object encoding a + // ErrorCodes::QueryPlanKilled message. + ALWAYS_MARK_KILLED, }; /** @@ -173,9 +183,10 @@ public: // // Passing YIELD_AUTO to any of these factories will construct a yielding executor which // may yield in the following circumstances: - // 1) During plan selection inside the call to make(). - // 2) On any call to getNext(). - // 3) While executing the plan inside executePlan(). + // - During plan selection inside the call to make(). + // - On any call to getNext(). + // - On any call to restoreState(). + // - While executing the plan inside executePlan(). // // The executor will also be automatically registered to receive notifications in the case of // YIELD_AUTO or YIELD_MANUAL. @@ -284,15 +295,18 @@ public: /** * Restores the state saved by a saveState() call. * - * Returns true if the state was successfully restored and the execution tree can be + * Returns Status::OK() if the state was successfully restored and the execution tree can be * work()'d. * - * Returns false if the PlanExecutor was killed while saved. A killed execution tree cannot be - * worked and should be deleted. + * Returns ErrorCodes::QueryPlanKilled if the PlanExecutor was killed while saved. * - * If allowed, will yield and retry if a WriteConflictException is encountered. + * If allowed, will yield and retry if a WriteConflictException is encountered. If the time + * limit is exceeded during this retry process, returns ErrorCodes::ExceededTimeLimit. If this + * PlanExecutor is killed during this retry process, returns ErrorCodes::QueryPlanKilled. In + * this scenario, locks will have been released, and will not be held when control returns to + * the caller. */ - bool restoreState(); + Status restoreState(); /** * Detaches from the OperationContext and releases any storage-engine state. @@ -317,7 +331,7 @@ public: * * This is only public for PlanYieldPolicy. DO NOT CALL ANYWHERE ELSE. */ - bool restoreStateWithoutRetrying(); + Status restoreStateWithoutRetrying(); // // Running Support @@ -444,19 +458,31 @@ public: } private: - // Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore - // is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF - // should be returned immediately. + /** + * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore + * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF + * should be returned immediately. + */ bool shouldWaitForInserts(); - // Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor - // is not capable of yielding based on a notifier. + /** + * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor + * is not capable of yielding based on a notifier. + */ std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(); - // Yields locks and waits for inserts to the collection. Returns true if there may be new - // inserts, false if there is a timeout or an interrupt. If this planExecutor cannot yield, - // returns true immediately. - bool waitForInserts(CappedInsertNotifierData* notifierData); + /** + * Yields locks and waits for inserts to the collection. Returns ADVANCED if there has been an + * insertion and there may be new results. Returns DEAD if the PlanExecutor was killed during a + * yield. This method is only to be used for tailable and awaitData cursors, so rather than + * returning DEAD if the operation has exceeded its time limit, we return IS_EOF to preserve + * this PlanExecutor for future use. + * + * If an error is encountered and 'errorObj' is provided, it is populated with an object + * describing the error. + */ + ExecState waitForInserts(CappedInsertNotifierData* notifierData, + Snapshotted<BSONObj>* errorObj); ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); @@ -508,6 +534,14 @@ private: */ Status pickBestPlan(const Collection* collection); + /** + * Given a non-OK status returned from a yield 'yieldError', checks if this PlanExecutor + * represents a tailable, awaitData cursor and whether 'yieldError' is the error object + * describing an operation time out. If so, returns IS_EOF. Otherwise returns DEAD, and + * populates 'errorObj' with the error - if 'errorObj' is not null. + */ + ExecState swallowTimeoutIfAwaitData(Status yieldError, Snapshotted<BSONObj>* errorObj) const; + // The OperationContext that we're executing within. This can be updated if necessary by using // detachFromOperationContext() and reattachToOperationContext(). OperationContext* _opCtx; diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index 483e24e9f7d..e570327216d 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -71,7 +71,7 @@ void PlanYieldPolicy::resetTimer() { _elapsedTracker.resetLastTime(); } -bool PlanYieldPolicy::yield(RecordFetcher* recordFetcher) { +Status PlanYieldPolicy::yield(RecordFetcher* recordFetcher) { invariant(_planYielding); if (recordFetcher) { OperationContext* opCtx = _planYielding->getOpCtx(); @@ -82,8 +82,8 @@ bool PlanYieldPolicy::yield(RecordFetcher* recordFetcher) { } } -bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn, - stdx::function<void()> whileYieldingFn) { +Status PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn, + stdx::function<void()> whileYieldingFn) { invariant(_planYielding); invariant(canAutoYield()); @@ -103,9 +103,12 @@ bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn, try { // All YIELD_AUTO plans will get here eventually when the elapsed tracker triggers // that it's time to yield. Whether or not we will actually yield, we need to check - // if this operation has been interrupted. Throws if the interrupt flag is set. + // if this operation has been interrupted. if (_policy == PlanExecutor::YIELD_AUTO) { - opCtx->checkForInterrupt(); + auto interruptStatus = opCtx->checkForInterruptNoAssert(); + if (!interruptStatus.isOK()) { + return interruptStatus; + } } try { diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h index cdf93f0219a..154a4b13d74 100644 --- a/src/mongo/db/query/plan_yield_policy.h +++ b/src/mongo/db/query/plan_yield_policy.h @@ -40,6 +40,8 @@ class RecordFetcher; class PlanYieldPolicy { public: + virtual ~PlanYieldPolicy() {} + PlanYieldPolicy(PlanExecutor* exec, PlanExecutor::YieldPolicy policy); /** @@ -54,7 +56,7 @@ public: * PlanExecutors give up their locks periodically in order to be fair to other * threads. */ - bool shouldYield(); + virtual bool shouldYield(); /** * Resets the yield timer so that we wait for a while before yielding again. @@ -69,16 +71,18 @@ public: * that we will page fault on this record. We use 'fetcher' to retrieve the record * after we give up our locks. * - * Returns true if the executor was restored successfully and is still alive. Returns false - * if the executor got killed during yield. + * Returns Status::OK() if the executor was restored successfully and is still alive. Returns + * ErrorCodes::QueryPlanKilled if the executor got killed during yield, and + * ErrorCodes::ExceededTimeLimit if the operation has exceeded the time limit. */ - bool yield(RecordFetcher* fetcher = NULL); + virtual Status yield(RecordFetcher* fetcher = NULL); /** * More generic version of yield() above. This version calls 'beforeYieldingFn' immediately * before locks are yielded (if they are), and 'whileYieldingFn' before locks are restored. */ - bool yield(stdx::function<void()> beforeYieldingFn, stdx::function<void()> whileYieldingFn); + virtual Status yield(stdx::function<void()> beforeYieldingFn, + stdx::function<void()> whileYieldingFn); /** * All calls to shouldYield() will return true until the next call to yield. @@ -93,7 +97,19 @@ public: * during this PlanExecutor's lifetime. */ bool canReleaseLocksDuringExecution() const { - return _policy == PlanExecutor::YIELD_AUTO || _policy == PlanExecutor::YIELD_MANUAL; + switch (_policy) { + case PlanExecutor::YIELD_AUTO: + case PlanExecutor::YIELD_MANUAL: + case PlanExecutor::ALWAYS_TIME_OUT: + case PlanExecutor::ALWAYS_MARK_KILLED: { + return true; + } + case PlanExecutor::NO_YIELD: + case PlanExecutor::WRITE_CONFLICT_RETRY_ONLY: { + return false; + } + } + MONGO_UNREACHABLE; } /** @@ -102,8 +118,18 @@ public: * locks. */ bool canAutoYield() const { - return _policy == PlanExecutor::YIELD_AUTO || - _policy == PlanExecutor::WRITE_CONFLICT_RETRY_ONLY; + switch (_policy) { + case PlanExecutor::YIELD_AUTO: + case PlanExecutor::WRITE_CONFLICT_RETRY_ONLY: + case PlanExecutor::ALWAYS_TIME_OUT: + case PlanExecutor::ALWAYS_MARK_KILLED: { + return true; + } + case PlanExecutor::NO_YIELD: + case PlanExecutor::YIELD_MANUAL: + return false; + } + MONGO_UNREACHABLE; } PlanExecutor::YieldPolicy getPolicy() const { diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 4af64c6be73..f5bd327705f 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -34,6 +34,7 @@ #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/pipeline/dependencies.h" @@ -43,12 +44,15 @@ #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/stage_builder.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/util/scopeguard.h" -namespace DocumentSourceCursorTests { +namespace mongo { +namespace { using boost::intrusive_ptr; using std::unique_ptr; @@ -65,28 +69,16 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) { return arr[0].getDocument().toBson(); } -class CollectionBase { +class DocumentSourceCursorTest : public unittest::Test { public: - CollectionBase() : client(&_opCtx) {} - - ~CollectionBase() { - client.dropCollection(nss.ns()); + DocumentSourceCursorTest() + : client(_opCtx.get()), + _ctx(new ExpressionContextForTest(_opCtx.get(), AggregationRequest(nss, {}))) { + _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; } -protected: - const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext(); - OperationContext& _opCtx = *_opCtxPtr; - DBDirectClient client; -}; - -namespace DocumentSourceCursor { - -using mongo::DocumentSourceCursor; - -class Base : public CollectionBase { -public: - Base() : _ctx(new ExpressionContextForTest(&_opCtx, AggregationRequest(nss, {}))) { - _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; + virtual ~DocumentSourceCursorTest() { + client.dropCollection(nss.ns()); } protected: @@ -94,16 +86,16 @@ protected: // clean up first if this was called before _source.reset(); - OldClientWriteContext ctx(&_opCtx, nss.ns()); + OldClientWriteContext ctx(opCtx(), nss.ns()); auto qr = stdx::make_unique<QueryRequest>(nss); if (hint) { qr->setHint(*hint); } - auto cq = uassertStatusOK(CanonicalQuery::canonicalize(&_opCtx, std::move(qr))); + auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); auto exec = uassertStatusOK( - getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::NO_YIELD)); + getExecutor(opCtx(), ctx.getCollection(), std::move(cq), PlanExecutor::NO_YIELD)); exec->saveState(); _source = DocumentSourceCursor::create(ctx.getCollection(), std::move(exec), _ctx); @@ -117,6 +109,14 @@ protected: return _source.get(); } + OperationContext* opCtx() { + return _opCtx.get(); + } + +protected: + const ServiceContext::UniqueOperationContext _opCtx = cc().makeOperationContext(); + DBDirectClient client; + private: // It is important that these are ordered to ensure correct destruction order. intrusive_ptr<ExpressionContextForTest> _ctx; @@ -124,78 +124,66 @@ private: }; /** Create a DocumentSourceCursor. */ -class Empty : public Base { -public: - void run() { - createSource(); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The collection is empty, so the source produces no results. - ASSERT(source()->getNext().isEOF()); - // Exhausting the source releases the read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - } -}; +TEST_F(DocumentSourceCursorTest, Empty) { + createSource(); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The collection is empty, so the source produces no results. + ASSERT(source()->getNext().isEOF()); + // Exhausting the source releases the read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); +} /** Iterate a DocumentSourceCursor. */ -class Iterate : public Base { -public: - void run() { - client.insert(nss.ns(), BSON("a" << 1)); - createSource(); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The cursor will produce the expected result. - auto next = source()->getNext(); - ASSERT(next.isAdvanced()); - ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); - // There are no more results. - ASSERT(source()->getNext().isEOF()); - // Exhausting the source releases the read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - } -}; +TEST_F(DocumentSourceCursorTest, Iterate) { + client.insert(nss.ns(), BSON("a" << 1)); + createSource(); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The cursor will produce the expected result. + auto next = source()->getNext(); + ASSERT(next.isAdvanced()); + ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); + // There are no more results. + ASSERT(source()->getNext().isEOF()); + // Exhausting the source releases the read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); +} /** Dispose of a DocumentSourceCursor. */ -class Dispose : public Base { -public: - void run() { - createSource(); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - source()->dispose(); - // Releasing the cursor releases the read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The source is marked as exhausted. - ASSERT(source()->getNext().isEOF()); - } -}; +TEST_F(DocumentSourceCursorTest, Dispose) { + createSource(); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + source()->dispose(); + // Releasing the cursor releases the read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The source is marked as exhausted. + ASSERT(source()->getNext().isEOF()); +} /** Iterate a DocumentSourceCursor and then dispose of it. */ -class IterateDispose : public Base { -public: - void run() { - client.insert(nss.ns(), BSON("a" << 1)); - client.insert(nss.ns(), BSON("a" << 2)); - client.insert(nss.ns(), BSON("a" << 3)); - createSource(); - // The result is as expected. - auto next = source()->getNext(); - ASSERT(next.isAdvanced()); - ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); - // The next result is as expected. - next = source()->getNext(); - ASSERT(next.isAdvanced()); - ASSERT_VALUE_EQ(Value(2), next.getDocument().getField("a")); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - source()->dispose(); - // Disposing of the source releases the lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The source cannot be advanced further. - ASSERT(source()->getNext().isEOF()); - } -}; +TEST_F(DocumentSourceCursorTest, IterateDispose) { + client.insert(nss.ns(), BSON("a" << 1)); + client.insert(nss.ns(), BSON("a" << 2)); + client.insert(nss.ns(), BSON("a" << 3)); + createSource(); + // The result is as expected. + auto next = source()->getNext(); + ASSERT(next.isAdvanced()); + ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); + // The next result is as expected. + next = source()->getNext(); + ASSERT(next.isAdvanced()); + ASSERT_VALUE_EQ(Value(2), next.getDocument().getField("a")); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + source()->dispose(); + // Disposing of the source releases the lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The source cannot be advanced further. + ASSERT(source()->getNext().isEOF()); +} /** Set a value or await an expected value. */ class PendingValue { @@ -221,148 +209,267 @@ private: /** Test coalescing a limit into a cursor */ -class LimitCoalesce : public Base { -public: - intrusive_ptr<DocumentSourceLimit> mkLimit(long long limit) { - return DocumentSourceLimit::create(ctx(), limit); - } - void run() { - client.insert(nss.ns(), BSON("a" << 1)); - client.insert(nss.ns(), BSON("a" << 2)); - client.insert(nss.ns(), BSON("a" << 3)); - createSource(); - - Pipeline::SourceContainer container; - container.push_back(source()); - container.push_back(mkLimit(10)); - source()->optimizeAt(container.begin(), &container); - - // initial limit becomes limit of cursor - ASSERT_EQUALS(container.size(), 1U); - ASSERT_EQUALS(source()->getLimit(), 10); - - container.push_back(mkLimit(2)); - source()->optimizeAt(container.begin(), &container); - // smaller limit lowers cursor limit - ASSERT_EQUALS(container.size(), 1U); - ASSERT_EQUALS(source()->getLimit(), 2); - - container.push_back(mkLimit(3)); - source()->optimizeAt(container.begin(), &container); - // higher limit doesn't effect cursor limit - ASSERT_EQUALS(container.size(), 1U); - ASSERT_EQUALS(source()->getLimit(), 2); - - // The cursor allows exactly 2 documents through - ASSERT(source()->getNext().isAdvanced()); - ASSERT(source()->getNext().isAdvanced()); - ASSERT(source()->getNext().isEOF()); - } -}; +TEST_F(DocumentSourceCursorTest, LimitCoalesce) { + client.insert(nss.ns(), BSON("a" << 1)); + client.insert(nss.ns(), BSON("a" << 2)); + client.insert(nss.ns(), BSON("a" << 3)); + createSource(); + + Pipeline::SourceContainer container; + container.push_back(source()); + container.push_back(DocumentSourceLimit::create(ctx(), 10)); + source()->optimizeAt(container.begin(), &container); + + // initial limit becomes limit of cursor + ASSERT_EQUALS(container.size(), 1U); + ASSERT_EQUALS(source()->getLimit(), 10); + + container.push_back(DocumentSourceLimit::create(ctx(), 2)); + source()->optimizeAt(container.begin(), &container); + // smaller limit lowers cursor limit + ASSERT_EQUALS(container.size(), 1U); + ASSERT_EQUALS(source()->getLimit(), 2); + + container.push_back(DocumentSourceLimit::create(ctx(), 3)); + source()->optimizeAt(container.begin(), &container); + // higher limit doesn't effect cursor limit + ASSERT_EQUALS(container.size(), 1U); + ASSERT_EQUALS(source()->getLimit(), 2); + + // The cursor allows exactly 2 documents through + ASSERT(source()->getNext().isAdvanced()); + ASSERT(source()->getNext().isAdvanced()); + ASSERT(source()->getNext().isEOF()); +} // // Test cursor output sort. // -class CollectionScanProvidesNoSort : public Base { -public: - void run() { - createSource(BSON("$natural" << 1)); - ASSERT_EQ(source()->getOutputSorts().size(), 0U); - source()->dispose(); - } -}; - -class IndexScanProvidesSortOnKeys : public Base { -public: - void run() { - client.createIndex(nss.ns(), BSON("a" << 1)); - createSource(BSON("a" << 1)); +TEST_F(DocumentSourceCursorTest, CollectionScanProvidesNoSort) { + createSource(BSON("$natural" << 1)); + ASSERT_EQ(source()->getOutputSorts().size(), 0U); + source()->dispose(); +} - ASSERT_EQ(source()->getOutputSorts().size(), 1U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); - source()->dispose(); - } -}; +TEST_F(DocumentSourceCursorTest, IndexScanProvidesSortOnKeys) { + client.createIndex(nss.ns(), BSON("a" << 1)); + createSource(BSON("a" << 1)); -class ReverseIndexScanProvidesSort : public Base { -public: - void run() { - client.createIndex(nss.ns(), BSON("a" << -1)); - createSource(BSON("a" << -1)); + ASSERT_EQ(source()->getOutputSorts().size(), 1U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); + source()->dispose(); +} - ASSERT_EQ(source()->getOutputSorts().size(), 1U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << -1)), 1U); - source()->dispose(); - } -}; +TEST_F(DocumentSourceCursorTest, ReverseIndexScanProvidesSort) { + client.createIndex(nss.ns(), BSON("a" << -1)); + createSource(BSON("a" << -1)); -class CompoundIndexScanProvidesMultipleSorts : public Base { -public: - void run() { - client.createIndex(nss.ns(), BSON("a" << 1 << "b" << -1)); - createSource(BSON("a" << 1 << "b" << -1)); - - ASSERT_EQ(source()->getOutputSorts().size(), 2U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1 << "b" << -1)), 1U); - source()->dispose(); - } -}; + ASSERT_EQ(source()->getOutputSorts().size(), 1U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << -1)), 1U); + source()->dispose(); +} -class SerializationRespectsExplainModes : public Base { -public: - void run() { - createSource(); +TEST_F(DocumentSourceCursorTest, CompoundIndexScanProvidesMultipleSorts) { + client.createIndex(nss.ns(), BSON("a" << 1 << "b" << -1)); + createSource(BSON("a" << 1 << "b" << -1)); - { - // Nothing serialized when no explain mode specified. - auto explainResult = source()->serialize(); - ASSERT_TRUE(explainResult.missing()); - } + ASSERT_EQ(source()->getOutputSorts().size(), 2U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1 << "b" << -1)), 1U); + source()->dispose(); +} - { - auto explainResult = source()->serialize(ExplainOptions::Verbosity::kQueryPlanner); - ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); - ASSERT_TRUE(explainResult["$cursor"]["executionStats"].missing()); - } +TEST_F(DocumentSourceCursorTest, SerializationRespectsExplainModes) { + createSource(); - { - auto explainResult = source()->serialize(ExplainOptions::Verbosity::kExecStats); - ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); - ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); - ASSERT_TRUE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); - } + { + // Nothing serialized when no explain mode specified. + auto explainResult = source()->serialize(); + ASSERT_TRUE(explainResult.missing()); + } - { - auto explainResult = - source()->serialize(ExplainOptions::Verbosity::kExecAllPlans).getDocument(); - ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); - ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); - ASSERT_FALSE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); - } - source()->dispose(); + { + auto explainResult = source()->serialize(ExplainOptions::Verbosity::kQueryPlanner); + ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); + ASSERT_TRUE(explainResult["$cursor"]["executionStats"].missing()); } -}; -} // namespace DocumentSourceCursor + { + auto explainResult = source()->serialize(ExplainOptions::Verbosity::kExecStats); + ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); + ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); + ASSERT_TRUE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); + } -class All : public Suite { -public: - All() : Suite("documentsource") {} - void setupTests() { - add<DocumentSourceCursor::Empty>(); - add<DocumentSourceCursor::Iterate>(); - add<DocumentSourceCursor::Dispose>(); - add<DocumentSourceCursor::IterateDispose>(); - add<DocumentSourceCursor::LimitCoalesce>(); - add<DocumentSourceCursor::CollectionScanProvidesNoSort>(); - add<DocumentSourceCursor::IndexScanProvidesSortOnKeys>(); - add<DocumentSourceCursor::ReverseIndexScanProvidesSort>(); - add<DocumentSourceCursor::CompoundIndexScanProvidesMultipleSorts>(); - add<DocumentSourceCursor::SerializationRespectsExplainModes>(); + { + auto explainResult = + source()->serialize(ExplainOptions::Verbosity::kExecAllPlans).getDocument(); + ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); + ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); + ASSERT_FALSE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); } -}; + source()->dispose(); +} -SuiteInstance<All> myall; +TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + const bool capped = true; + const bool cappedSize = 1024; + ASSERT_TRUE(client.createCollection(nss.ns(), cappedSize, capped)); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + collScanParams.tailable = true; + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + queryRequest->setTailable(true); + queryRequest->setAwaitData(true); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ASSERT(cursor->getNext().isEOF()); + cursor->dispose(); +} + +TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + ASSERT_TRUE(client.createCollection(nss.ns())); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kNormal; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ON_BLOCK_EXIT([cursor]() { cursor->dispose(); }); + ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled); +} + +TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKilled) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + const bool capped = true; + const bool cappedSize = 1024; + ASSERT_TRUE(client.createCollection(nss.ns(), cappedSize, capped)); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + collScanParams.tailable = true; + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ON_BLOCK_EXIT([cursor]() { cursor->dispose(); }); + ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled); +} + +TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + ASSERT_TRUE(client.createCollection(nss.ns())); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kNormal; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ON_BLOCK_EXIT([cursor]() { cursor->dispose(); }); + ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled); +} -} // namespace DocumentSourceCursorTests +} // namespace +} // namespace mongo diff --git a/src/mongo/dbtests/executor_registry.cpp b/src/mongo/dbtests/executor_registry.cpp index 8d961c51eff..044499c4416 100644 --- a/src/mongo/dbtests/executor_registry.cpp +++ b/src/mongo/dbtests/executor_registry.cpp @@ -137,7 +137,7 @@ public: // At this point, we're done yielding. We recover our lock. // And clean up anything that happened before. - exec->restoreState(); + ASSERT_OK(exec->restoreState()); // Make sure that the PlanExecutor moved forward over the deleted data. We don't see // foo==10 @@ -164,14 +164,12 @@ public: ASSERT_EQUALS(i, obj["foo"].numberInt()); } - // Save state and register. exec->saveState(); // Drop a collection that's not ours. _client.dropCollection("unittests.someboguscollection"); - // Unregister and restore state. - exec->restoreState(); + ASSERT_OK(exec->restoreState()); ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL)); ASSERT_EQUALS(10, obj["foo"].numberInt()); @@ -180,10 +178,7 @@ public: _client.dropCollection(nss.ns()); - exec->restoreState(); - - // PlanExecutor was killed. - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&obj, NULL)); + ASSERT_EQUALS(ErrorCodes::QueryPlanKilled, exec->restoreState()); } }; @@ -204,8 +199,7 @@ public: exec->saveState(); _client.dropIndexes(nss.ns()); - exec->restoreState(); - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&obj, NULL)); + ASSERT_EQUALS(ErrorCodes::QueryPlanKilled, exec->restoreState()); } }; @@ -226,8 +220,7 @@ public: exec->saveState(); _client.dropIndex(nss.ns(), BSON("foo" << 1)); - exec->restoreState(); - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&obj, NULL)); + ASSERT_EQUALS(ErrorCodes::QueryPlanKilled, exec->restoreState()); } }; @@ -251,7 +244,7 @@ public: _ctx.reset(); _client.dropDatabase("somesillydb"); _ctx.reset(new OldClientWriteContext(&_opCtx, nss.ns())); - exec->restoreState(); + ASSERT_OK(exec->restoreState()); ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL)); ASSERT_EQUALS(10, obj["foo"].numberInt()); @@ -262,11 +255,7 @@ public: _ctx.reset(); _client.dropDatabase("unittests"); _ctx.reset(new OldClientWriteContext(&_opCtx, nss.ns())); - exec->restoreState(); - _ctx.reset(); - - // PlanExecutor was killed. - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&obj, NULL)); + ASSERT_EQUALS(ErrorCodes::QueryPlanKilled, exec->restoreState()); } }; diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 4b1a1eef0e4..99f4548b631 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -53,7 +53,8 @@ #include "mongo/dbtests/dbtests.h" #include "mongo/stdx/memory.h" -namespace QueryPlanExecutor { +namespace mongo { +namespace { using std::shared_ptr; using std::string; @@ -62,11 +63,11 @@ using stdx::make_unique; static const NamespaceString nss("unittests.QueryPlanExecutor"); -class PlanExecutorBase { +class PlanExecutorTest : public unittest::Test { public: - PlanExecutorBase() : _client(&_opCtx) {} + PlanExecutorTest() : _client(&_opCtx) {} - virtual ~PlanExecutorBase() { + virtual ~PlanExecutorTest() { _client.dropCollection(nss.ns()); } @@ -94,8 +95,12 @@ public: * Given a match expression, represented as the BSON object 'filterObj', create a PlanExecutor * capable of executing a simple collection scan. */ - unique_ptr<PlanExecutor, PlanExecutor::Deleter> makeCollScanExec(Collection* coll, - BSONObj& filterObj) { + unique_ptr<PlanExecutor, PlanExecutor::Deleter> makeCollScanExec( + Collection* coll, + BSONObj& filterObj, + PlanExecutor::YieldPolicy yieldPolicy = PlanExecutor::YieldPolicy::YIELD_MANUAL, + bool tailable = false, + bool awaitData = false) { CollectionScanParams csparams; csparams.collection = coll; csparams.direction = CollectionScanParams::FORWARD; @@ -104,8 +109,10 @@ public: // Canonicalize the query. auto qr = stdx::make_unique<QueryRequest>(nss); qr->setFilter(filterObj); + qr->setTailable(tailable); + qr->setAwaitData(awaitData); auto statusWithCQ = CanonicalQuery::canonicalize(&_opCtx, std::move(qr)); - verify(statusWithCQ.isOK()); + ASSERT_OK(statusWithCQ.getStatus()); unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); verify(NULL != cq.get()); @@ -114,12 +121,8 @@ public: new CollectionScan(&_opCtx, csparams, ws.get(), cq.get()->root())); // Hand the plan off to the executor. - auto statusWithPlanExecutor = PlanExecutor::make(&_opCtx, - std::move(ws), - std::move(root), - std::move(cq), - coll, - PlanExecutor::YIELD_MANUAL); + auto statusWithPlanExecutor = PlanExecutor::make( + &_opCtx, std::move(ws), std::move(root), std::move(cq), coll, yieldPolicy); ASSERT_OK(statusWithPlanExecutor.getStatus()); return std::move(statusWithPlanExecutor.getValue()); } @@ -191,105 +194,160 @@ private: * Test dropping the collection while the * PlanExecutor is doing a collection scan. */ -class DropCollScan : public PlanExecutorBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - insert(BSON("_id" << 1)); - insert(BSON("_id" << 2)); +TEST_F(PlanExecutorTest, DropCollScan) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + insert(BSON("_id" << 1)); + insert(BSON("_id" << 2)); - BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); + BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); - Collection* coll = ctx.getCollection(); - auto exec = makeCollScanExec(coll, filterObj); + Collection* coll = ctx.getCollection(); + auto exec = makeCollScanExec(coll, filterObj); - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(1, objOut["_id"].numberInt()); + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(1, objOut["_id"].numberInt()); - // After dropping the collection, the plan executor should be dead. - dropCollection(); - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); - } -}; + // After dropping the collection, the plan executor should be dead. + dropCollection(); + ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); +} /** * Test dropping the collection while the PlanExecutor is doing an index scan. */ -class DropIndexScan : public PlanExecutorBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - insert(BSON("_id" << 1 << "a" << 6)); - insert(BSON("_id" << 2 << "a" << 7)); - insert(BSON("_id" << 3 << "a" << 8)); - BSONObj indexSpec = BSON("a" << 1); - addIndex(indexSpec); +TEST_F(PlanExecutorTest, DropIndexScan) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + insert(BSON("_id" << 1 << "a" << 6)); + insert(BSON("_id" << 2 << "a" << 7)); + insert(BSON("_id" << 3 << "a" << 8)); + BSONObj indexSpec = BSON("a" << 1); + addIndex(indexSpec); - auto exec = makeIndexScanExec(ctx.db(), indexSpec, 7, 10); + auto exec = makeIndexScanExec(ctx.db(), indexSpec, 7, 10); - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(7, objOut["a"].numberInt()); + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(7, objOut["a"].numberInt()); - // After dropping the collection, the plan executor should be dead. - dropCollection(); - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); - } -}; + // After dropping the collection, the plan executor should be dead. + dropCollection(); + ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); +} /** * Test dropping the collection while an agg PlanExecutor is doing an index scan. */ -class DropIndexScanAgg : public PlanExecutorBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - - insert(BSON("_id" << 1 << "a" << 6)); - insert(BSON("_id" << 2 << "a" << 7)); - insert(BSON("_id" << 3 << "a" << 8)); - BSONObj indexSpec = BSON("a" << 1); - addIndex(indexSpec); - - Collection* collection = ctx.getCollection(); - - // Create the aggregation pipeline. - std::vector<BSONObj> rawPipeline = {fromjson("{$match: {a: {$gte: 7, $lte: 10}}}")}; - boost::intrusive_ptr<ExpressionContextForTest> expCtx = - new ExpressionContextForTest(&_opCtx, AggregationRequest(nss, rawPipeline)); - - // Create an "inner" plan executor and register it with the cursor manager so that it can - // get notified when the collection is dropped. - unique_ptr<PlanExecutor, PlanExecutor::Deleter> innerExec( - makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); - - // Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source - // in the pipeline. - innerExec->saveState(); - auto cursorSource = DocumentSourceCursor::create(collection, std::move(innerExec), expCtx); - auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx)); +TEST_F(PlanExecutorTest, DropIndexScanAgg) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + + insert(BSON("_id" << 1 << "a" << 6)); + insert(BSON("_id" << 2 << "a" << 7)); + insert(BSON("_id" << 3 << "a" << 8)); + BSONObj indexSpec = BSON("a" << 1); + addIndex(indexSpec); + + Collection* collection = ctx.getCollection(); + + // Create the aggregation pipeline. + std::vector<BSONObj> rawPipeline = {fromjson("{$match: {a: {$gte: 7, $lte: 10}}}")}; + boost::intrusive_ptr<ExpressionContextForTest> expCtx = + new ExpressionContextForTest(&_opCtx, AggregationRequest(nss, rawPipeline)); + + // Create an "inner" plan executor and register it with the cursor manager so that it can + // get notified when the collection is dropped. + unique_ptr<PlanExecutor, PlanExecutor::Deleter> innerExec( + makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); + + // Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source + // in the pipeline. + innerExec->saveState(); + auto cursorSource = DocumentSourceCursor::create(collection, std::move(innerExec), expCtx); + auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx)); + + // Create the output PlanExecutor that pulls results from the pipeline. + auto ws = make_unique<WorkingSet>(); + auto proxy = make_unique<PipelineProxyStage>(&_opCtx, std::move(pipeline), ws.get()); + + auto statusWithPlanExecutor = PlanExecutor::make( + &_opCtx, std::move(ws), std::move(proxy), collection, PlanExecutor::NO_YIELD); + ASSERT_OK(statusWithPlanExecutor.getStatus()); + auto outerExec = std::move(statusWithPlanExecutor.getValue()); + + dropCollection(); + + // Verify that the aggregation pipeline returns an error because its "inner" plan executor + // has been killed due to the collection being dropped. + BSONObj objOut; + ASSERT_THROWS_CODE( + outerExec->getNext(&objOut, nullptr), AssertionException, ErrorCodes::QueryPlanKilled); +} + +TEST_F(PlanExecutorTest, ShouldReportErrorIfExceedsTimeLimitDuringYield) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + insert(BSON("_id" << 1)); + insert(BSON("_id" << 2)); + + BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); + + Collection* coll = ctx.getCollection(); + auto exec = makeCollScanExec(coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT); + + BSONObj resultObj; + ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, WorkingSetCommon::getMemberObjectStatus(resultObj)); +} + +TEST_F(PlanExecutorTest, ShouldReportEOFIfExceedsTimeLimitDuringYieldButIsTailableAndAwaitData) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + insert(BSON("_id" << 1)); + insert(BSON("_id" << 2)); + + BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); + + Collection* coll = ctx.getCollection(); + const bool tailable = true; + const bool awaitData = true; + auto exec = makeCollScanExec( + coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, tailable, awaitData); + + BSONObj resultObj; + ASSERT_EQ(PlanExecutor::IS_EOF, exec->getNext(&resultObj, nullptr)); +} + +TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailableButNotAwaitData) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + insert(BSON("_id" << 1)); + insert(BSON("_id" << 2)); + + BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); + + Collection* coll = ctx.getCollection(); + const bool tailable = true; + auto exec = + makeCollScanExec(coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, tailable); + + BSONObj resultObj; + ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, WorkingSetCommon::getMemberObjectStatus(resultObj)); +} + +TEST_F(PlanExecutorTest, ShouldReportErrorIfKilledDuringYield) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + insert(BSON("_id" << 1)); + insert(BSON("_id" << 2)); + + BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); + + Collection* coll = ctx.getCollection(); + auto exec = makeCollScanExec(coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED); + + BSONObj resultObj; + ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); + ASSERT_EQ(ErrorCodes::QueryPlanKilled, WorkingSetCommon::getMemberObjectStatus(resultObj)); +} - // Create the output PlanExecutor that pulls results from the pipeline. - auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(&_opCtx, std::move(pipeline), ws.get()); - - auto statusWithPlanExecutor = PlanExecutor::make( - &_opCtx, std::move(ws), std::move(proxy), collection, PlanExecutor::NO_YIELD); - ASSERT_OK(statusWithPlanExecutor.getStatus()); - auto outerExec = std::move(statusWithPlanExecutor.getValue()); - - dropCollection(); - - // Verify that the aggregation pipeline returns an error because its "inner" plan executor - // has been killed due to the collection being dropped. - BSONObj objOut; - ASSERT_THROWS_CODE( - outerExec->getNext(&objOut, nullptr), AssertionException, ErrorCodes::QueryPlanKilled); - } -}; - -class SnapshotBase : public PlanExecutorBase { +class PlanExecutorSnapshotTest : public PlanExecutorTest { protected: void setupCollection() { insert(BSON("_id" << 1 << "a" << 1)); @@ -338,70 +396,50 @@ protected: * twice due to a concurrent document move and collection * scan. */ -class SnapshotControl : public SnapshotBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - setupCollection(); +TEST_F(PlanExecutorSnapshotTest, SnapshotControl) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + setupCollection(); - BSONObj filterObj = fromjson("{a: {$gte: 2}}"); + BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - Collection* coll = ctx.getCollection(); - auto exec = makeCollScanExec(coll, filterObj); + Collection* coll = ctx.getCollection(); + auto exec = makeCollScanExec(coll, filterObj); - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(2, objOut["a"].numberInt()); + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(2, objOut["a"].numberInt()); - forceDocumentMove(); + forceDocumentMove(); - int ids[] = {3, 4, 2}; - checkIds(ids, exec.get()); - } -}; + int ids[] = {3, 4, 2}; + checkIds(ids, exec.get()); +} /** * A snapshot is really just a hint that means scan the _id index. * Make sure that we do not see the document move with an _id * index scan. */ -class SnapshotTest : public SnapshotBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - setupCollection(); - BSONObj indexSpec = BSON("_id" << 1); - addIndex(indexSpec); +TEST_F(PlanExecutorSnapshotTest, SnapshotTest) { + OldClientWriteContext ctx(&_opCtx, nss.ns()); + setupCollection(); + BSONObj indexSpec = BSON("_id" << 1); + addIndex(indexSpec); - BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - auto exec = makeIndexScanExec(ctx.db(), indexSpec, 2, 5); + BSONObj filterObj = fromjson("{a: {$gte: 2}}"); + auto exec = makeIndexScanExec(ctx.db(), indexSpec, 2, 5); - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(2, objOut["a"].numberInt()); - - forceDocumentMove(); + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(2, objOut["a"].numberInt()); - // Since this time we're scanning the _id index, - // we should not see the moved document again. - int ids[] = {3, 4}; - checkIds(ids, exec.get()); - } -}; - -class All : public Suite { -public: - All() : Suite("query_plan_executor") {} - - void setupTests() { - add<DropCollScan>(); - add<DropIndexScan>(); - add<DropIndexScanAgg>(); - add<SnapshotControl>(); - add<SnapshotTest>(); - } -}; + forceDocumentMove(); -SuiteInstance<All> queryPlanExecutorAll; + // Since this time we're scanning the _id index, + // we should not see the moved document again. + int ids[] = {3, 4}; + checkIds(ids, exec.get()); +} -} // namespace QueryPlanExecutor +} // namespace +} // namespace mongo diff --git a/src/mongo/dbtests/query_stage_multiplan.cpp b/src/mongo/dbtests/query_stage_multiplan.cpp index 0f02e120110..dd1fc50b924 100644 --- a/src/mongo/dbtests/query_stage_multiplan.cpp +++ b/src/mongo/dbtests/query_stage_multiplan.cpp @@ -44,6 +44,7 @@ #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_knobs.h" @@ -61,9 +62,7 @@ const std::unique_ptr<ClockSource> clockSource = stdx::make_unique<ClockSourceMo // How we access the external setParameter testing bool. extern AtomicBool internalQueryForceIntersectionPlans; -} // namespace mongo - -namespace QueryStageMultiPlan { +namespace { using std::unique_ptr; using std::vector; @@ -82,40 +81,43 @@ QuerySolution* createQuerySolution() { return soln.release(); } -class QueryStageMultiPlanBase { +class QueryStageMultiPlanTest : public unittest::Test { public: - QueryStageMultiPlanBase() : _client(&_opCtx) { - OldClientWriteContext ctx(&_opCtx, nss.ns()); + QueryStageMultiPlanTest() : _client(_opCtx.get()) { + OldClientWriteContext ctx(_opCtx.get(), nss.ns()); _client.dropCollection(nss.ns()); } - virtual ~QueryStageMultiPlanBase() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); + virtual ~QueryStageMultiPlanTest() { + OldClientWriteContext ctx(_opCtx.get(), nss.ns()); _client.dropCollection(nss.ns()); } void addIndex(const BSONObj& obj) { - ASSERT_OK(dbtests::createIndex(&_opCtx, nss.ns(), obj)); + ASSERT_OK(dbtests::createIndex(_opCtx.get(), nss.ns(), obj)); } void insert(const BSONObj& obj) { - OldClientWriteContext ctx(&_opCtx, nss.ns()); + OldClientWriteContext ctx(_opCtx.get(), nss.ns()); _client.insert(nss.ns(), obj); } void remove(const BSONObj& obj) { - OldClientWriteContext ctx(&_opCtx, nss.ns()); + OldClientWriteContext ctx(_opCtx.get(), nss.ns()); _client.remove(nss.ns(), obj); } OperationContext* opCtx() { - return &_opCtx; + return _opCtx.get(); + } + + ServiceContext* serviceContext() { + return _opCtx->getServiceContext(); } protected: - const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext(); - OperationContext& _opCtx = *_txnPtr; - ClockSource* const _clock = _opCtx.getServiceContext()->getFastClockSource(); + const ServiceContext::UniqueOperationContext _opCtx = cc().makeOperationContext(); + ClockSource* const _clock = _opCtx->getServiceContext()->getFastClockSource(); DBDirectClient _client; }; @@ -123,326 +125,424 @@ protected: // Basic ranking test: collection scan vs. highly selective index scan. Make sure we also get // all expected results out as well. -class MPSCollectionScanVsHighlySelectiveIXScan : public QueryStageMultiPlanBase { -public: - void run() { - const int N = 5000; - for (int i = 0; i < N; ++i) { - insert(BSON("foo" << (i % 10))); - } +TEST_F(QueryStageMultiPlanTest, MPSCollectionScanVsHighlySelectiveIXScan) { + const int N = 5000; + for (int i = 0; i < N; ++i) { + insert(BSON("foo" << (i % 10))); + } - addIndex(BSON("foo" << 1)); - - AutoGetCollectionForReadCommand ctx(&_opCtx, nss); - const Collection* coll = ctx.getCollection(); - - // Plan 0: IXScan over foo == 7 - // Every call to work() returns something so this should clearly win (by current scoring - // at least). - std::vector<IndexDescriptor*> indexes; - coll->getIndexCatalog()->findIndexesByKeyPattern( - &_opCtx, BSON("foo" << 1), false, &indexes); - ASSERT_EQ(indexes.size(), 1U); - - IndexScanParams ixparams; - ixparams.descriptor = indexes[0]; - ixparams.bounds.isSimpleRange = true; - ixparams.bounds.startKey = BSON("" << 7); - ixparams.bounds.endKey = BSON("" << 7); - ixparams.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys; - ixparams.direction = 1; - - unique_ptr<WorkingSet> sharedWs(new WorkingSet()); - IndexScan* ix = new IndexScan(&_opCtx, ixparams, sharedWs.get(), NULL); - unique_ptr<PlanStage> firstRoot(new FetchStage(&_opCtx, sharedWs.get(), ix, NULL, coll)); - - // Plan 1: CollScan with matcher. - CollectionScanParams csparams; - csparams.collection = coll; - csparams.direction = CollectionScanParams::FORWARD; - - // Make the filter. - BSONObj filterObj = BSON("foo" << 7); - const CollatorInterface* collator = nullptr; - StatusWithMatchExpression statusWithMatcher = - MatchExpressionParser::parse(filterObj, collator); - verify(statusWithMatcher.isOK()); - unique_ptr<MatchExpression> filter = std::move(statusWithMatcher.getValue()); - // Make the stage. - unique_ptr<PlanStage> secondRoot( - new CollectionScan(&_opCtx, csparams, sharedWs.get(), filter.get())); - - // Hand the plans off to the MPS. - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(BSON("foo" << 7)); - auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); - verify(statusWithCQ.isOK()); - unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - verify(NULL != cq.get()); - - unique_ptr<MultiPlanStage> mps = - make_unique<MultiPlanStage>(&_opCtx, ctx.getCollection(), cq.get()); - mps->addPlan(createQuerySolution(), firstRoot.release(), sharedWs.get()); - mps->addPlan(createQuerySolution(), secondRoot.release(), sharedWs.get()); - - // Plan 0 aka the first plan aka the index scan should be the best. - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - mps->pickBestPlan(&yieldPolicy).transitional_ignore(); - ASSERT(mps->bestPlanChosen()); - ASSERT_EQUALS(0, mps->bestPlanIdx()); - - // Takes ownership of arguments other than 'collection'. - auto statusWithPlanExecutor = PlanExecutor::make(&_opCtx, - std::move(sharedWs), - std::move(mps), - std::move(cq), - coll, - PlanExecutor::NO_YIELD); - ASSERT_OK(statusWithPlanExecutor.getStatus()); - auto exec = std::move(statusWithPlanExecutor.getValue()); - - // Get all our results out. - int results = 0; - BSONObj obj; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { - ASSERT_EQUALS(obj["foo"].numberInt(), 7); - ++results; - } - ASSERT_EQUALS(PlanExecutor::IS_EOF, state); - ASSERT_EQUALS(results, N / 10); + addIndex(BSON("foo" << 1)); + + AutoGetCollectionForReadCommand ctx(_opCtx.get(), nss); + const Collection* coll = ctx.getCollection(); + + // Plan 0: IXScan over foo == 7 + // Every call to work() returns something so this should clearly win (by current scoring + // at least). + std::vector<IndexDescriptor*> indexes; + coll->getIndexCatalog()->findIndexesByKeyPattern( + _opCtx.get(), BSON("foo" << 1), false, &indexes); + ASSERT_EQ(indexes.size(), 1U); + + IndexScanParams ixparams; + ixparams.descriptor = indexes[0]; + ixparams.bounds.isSimpleRange = true; + ixparams.bounds.startKey = BSON("" << 7); + ixparams.bounds.endKey = BSON("" << 7); + ixparams.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys; + ixparams.direction = 1; + + unique_ptr<WorkingSet> sharedWs(new WorkingSet()); + IndexScan* ix = new IndexScan(_opCtx.get(), ixparams, sharedWs.get(), NULL); + unique_ptr<PlanStage> firstRoot(new FetchStage(_opCtx.get(), sharedWs.get(), ix, NULL, coll)); + + // Plan 1: CollScan with matcher. + CollectionScanParams csparams; + csparams.collection = coll; + csparams.direction = CollectionScanParams::FORWARD; + + // Make the filter. + BSONObj filterObj = BSON("foo" << 7); + const CollatorInterface* collator = nullptr; + StatusWithMatchExpression statusWithMatcher = MatchExpressionParser::parse(filterObj, collator); + verify(statusWithMatcher.isOK()); + unique_ptr<MatchExpression> filter = std::move(statusWithMatcher.getValue()); + // Make the stage. + unique_ptr<PlanStage> secondRoot( + new CollectionScan(_opCtx.get(), csparams, sharedWs.get(), filter.get())); + + // Hand the plans off to the MPS. + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(BSON("foo" << 7)); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); + verify(statusWithCQ.isOK()); + unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + verify(NULL != cq.get()); + + unique_ptr<MultiPlanStage> mps = + make_unique<MultiPlanStage>(_opCtx.get(), ctx.getCollection(), cq.get()); + mps->addPlan(createQuerySolution(), firstRoot.release(), sharedWs.get()); + mps->addPlan(createQuerySolution(), secondRoot.release(), sharedWs.get()); + + // Plan 0 aka the first plan aka the index scan should be the best. + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(mps->pickBestPlan(&yieldPolicy)); + ASSERT(mps->bestPlanChosen()); + ASSERT_EQUALS(0, mps->bestPlanIdx()); + + // Takes ownership of arguments other than 'collection'. + auto statusWithPlanExecutor = PlanExecutor::make(_opCtx.get(), + std::move(sharedWs), + std::move(mps), + std::move(cq), + coll, + PlanExecutor::NO_YIELD); + ASSERT_OK(statusWithPlanExecutor.getStatus()); + auto exec = std::move(statusWithPlanExecutor.getValue()); + + // Get all our results out. + int results = 0; + BSONObj obj; + PlanExecutor::ExecState state; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { + ASSERT_EQUALS(obj["foo"].numberInt(), 7); + ++results; } -}; + ASSERT_EQUALS(PlanExecutor::IS_EOF, state); + ASSERT_EQUALS(results, N / 10); +} // Case in which we select a blocking plan as the winner, and a non-blocking plan // is available as a backup. -class MPSBackupPlan : public QueryStageMultiPlanBase { -public: - void run() { - // Data is just a single {_id: 1, a: 1, b: 1} document. - insert(BSON("_id" << 1 << "a" << 1 << "b" << 1)); - - // Indices on 'a' and 'b'. - addIndex(BSON("a" << 1)); - addIndex(BSON("b" << 1)); - - AutoGetCollectionForReadCommand ctx(&_opCtx, nss); - Collection* collection = ctx.getCollection(); - - // Query for both 'a' and 'b' and sort on 'b'. - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(BSON("a" << 1 << "b" << 1)); - qr->setSort(BSON("b" << 1)); - auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); - verify(statusWithCQ.isOK()); - unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - ASSERT(NULL != cq.get()); - - // Force index intersection. - bool forceIxisectOldValue = internalQueryForceIntersectionPlans.load(); - internalQueryForceIntersectionPlans.store(true); - - // Get planner params. - QueryPlannerParams plannerParams; - fillOutPlannerParams(&_opCtx, collection, cq.get(), &plannerParams); - // Turn this off otherwise it pops up in some plans. - plannerParams.options &= ~QueryPlannerParams::KEEP_MUTATIONS; - - // Plan. - vector<QuerySolution*> solutions; - Status status = QueryPlanner::plan(*cq, plannerParams, &solutions); - ASSERT(status.isOK()); - - // We expect a plan using index {a: 1} and plan using index {b: 1} and - // an index intersection plan. - ASSERT_EQUALS(solutions.size(), 3U); - - // Fill out the MultiPlanStage. - unique_ptr<MultiPlanStage> mps(new MultiPlanStage(&_opCtx, collection, cq.get())); - unique_ptr<WorkingSet> ws(new WorkingSet()); - // Put each solution from the planner into the MPR. - for (size_t i = 0; i < solutions.size(); ++i) { - PlanStage* root; - ASSERT(StageBuilder::build(&_opCtx, collection, *cq, *solutions[i], ws.get(), &root)); - // Takes ownership of 'solutions[i]' and 'root'. - mps->addPlan(solutions[i], root, ws.get()); - } +TEST_F(QueryStageMultiPlanTest, MPSBackupPlan) { + // Data is just a single {_id: 1, a: 1, b: 1} document. + insert(BSON("_id" << 1 << "a" << 1 << "b" << 1)); + + // Indices on 'a' and 'b'. + addIndex(BSON("a" << 1)); + addIndex(BSON("b" << 1)); + + AutoGetCollectionForReadCommand ctx(_opCtx.get(), nss); + Collection* collection = ctx.getCollection(); + + // Query for both 'a' and 'b' and sort on 'b'. + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(BSON("a" << 1 << "b" << 1)); + qr->setSort(BSON("b" << 1)); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); + verify(statusWithCQ.isOK()); + unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + ASSERT(NULL != cq.get()); + + // Force index intersection. + bool forceIxisectOldValue = internalQueryForceIntersectionPlans.load(); + internalQueryForceIntersectionPlans.store(true); + + // Get planner params. + QueryPlannerParams plannerParams; + fillOutPlannerParams(_opCtx.get(), collection, cq.get(), &plannerParams); + // Turn this off otherwise it pops up in some plans. + plannerParams.options &= ~QueryPlannerParams::KEEP_MUTATIONS; + + // Plan. + vector<QuerySolution*> solutions; + Status status = QueryPlanner::plan(*cq, plannerParams, &solutions); + ASSERT(status.isOK()); + + // We expect a plan using index {a: 1} and plan using index {b: 1} and + // an index intersection plan. + ASSERT_EQUALS(solutions.size(), 3U); + + // Fill out the MultiPlanStage. + unique_ptr<MultiPlanStage> mps(new MultiPlanStage(_opCtx.get(), collection, cq.get())); + unique_ptr<WorkingSet> ws(new WorkingSet()); + // Put each solution from the planner into the MPR. + for (size_t i = 0; i < solutions.size(); ++i) { + PlanStage* root; + ASSERT(StageBuilder::build(_opCtx.get(), collection, *cq, *solutions[i], ws.get(), &root)); + // Takes ownership of 'solutions[i]' and 'root'. + mps->addPlan(solutions[i], root, ws.get()); + } - // This sets a backup plan. - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - mps->pickBestPlan(&yieldPolicy).transitional_ignore(); - ASSERT(mps->bestPlanChosen()); - ASSERT(mps->hasBackupPlan()); - - // We should have picked the index intersection plan due to forcing ixisect. - QuerySolution* soln = mps->bestSolution(); - ASSERT(QueryPlannerTestLib::solutionMatches( - "{sort: {pattern: {b: 1}, limit: 0, node: {sortKeyGen: {node:" - "{fetch: {node: {andSorted: {nodes: [" - "{ixscan: {filter: null, pattern: {a:1}}}," - "{ixscan: {filter: null, pattern: {b:1}}}]}}}}}}}}", - soln->root.get())); - - // Get the resulting document. - PlanStage::StageState state = PlanStage::NEED_TIME; - WorkingSetID wsid; - while (state != PlanStage::ADVANCED) { - state = mps->work(&wsid); - } - WorkingSetMember* member = ws->get(wsid); - - // Check the document returned by the query. - ASSERT(member->hasObj()); - BSONObj expectedDoc = BSON("_id" << 1 << "a" << 1 << "b" << 1); - ASSERT(expectedDoc.woCompare(member->obj.value()) == 0); - - // The blocking plan became unblocked, so we should no longer have a backup plan, - // and the winning plan should still be the index intersection one. - ASSERT(!mps->hasBackupPlan()); - soln = mps->bestSolution(); - ASSERT(QueryPlannerTestLib::solutionMatches( - "{sort: {pattern: {b: 1}, limit: 0, node: {sortKeyGen: {node:" - "{fetch: {node: {andSorted: {nodes: [" - "{ixscan: {filter: null, pattern: {a:1}}}," - "{ixscan: {filter: null, pattern: {b:1}}}]}}}}}}}}", - soln->root.get())); - - // Restore index intersection force parameter. - internalQueryForceIntersectionPlans.store(forceIxisectOldValue); + // This sets a backup plan. + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(mps->pickBestPlan(&yieldPolicy)); + ASSERT(mps->bestPlanChosen()); + ASSERT(mps->hasBackupPlan()); + + // We should have picked the index intersection plan due to forcing ixisect. + QuerySolution* soln = mps->bestSolution(); + ASSERT(QueryPlannerTestLib::solutionMatches( + "{sort: {pattern: {b: 1}, limit: 0, node: {sortKeyGen: {node:" + "{fetch: {node: {andSorted: {nodes: [" + "{ixscan: {filter: null, pattern: {a:1}}}," + "{ixscan: {filter: null, pattern: {b:1}}}]}}}}}}}}", + soln->root.get())); + + // Get the resulting document. + PlanStage::StageState state = PlanStage::NEED_TIME; + WorkingSetID wsid; + while (state != PlanStage::ADVANCED) { + state = mps->work(&wsid); } -}; + WorkingSetMember* member = ws->get(wsid); + + // Check the document returned by the query. + ASSERT(member->hasObj()); + BSONObj expectedDoc = BSON("_id" << 1 << "a" << 1 << "b" << 1); + ASSERT(expectedDoc.woCompare(member->obj.value()) == 0); + + // The blocking plan became unblocked, so we should no longer have a backup plan, + // and the winning plan should still be the index intersection one. + ASSERT(!mps->hasBackupPlan()); + soln = mps->bestSolution(); + ASSERT(QueryPlannerTestLib::solutionMatches( + "{sort: {pattern: {b: 1}, limit: 0, node: {sortKeyGen: {node:" + "{fetch: {node: {andSorted: {nodes: [" + "{ixscan: {filter: null, pattern: {a:1}}}," + "{ixscan: {filter: null, pattern: {b:1}}}]}}}}}}}}", + soln->root.get())); + + // Restore index intersection force parameter. + internalQueryForceIntersectionPlans.store(forceIxisectOldValue); +} -// Test the structure and values of the explain output. -class MPSExplainAllPlans : public QueryStageMultiPlanBase { -public: - void run() { - // Insert a document to create the collection. - insert(BSON("x" << 1)); +/** + * Allocates a new WorkingSetMember with data 'dataObj' in 'ws', and adds the WorkingSetMember + * to 'qds'. + */ +void addMember(QueuedDataStage* qds, WorkingSet* ws, BSONObj dataObj) { + WorkingSetID id = ws->allocate(); + WorkingSetMember* wsm = ws->get(id); + wsm->obj = Snapshotted<BSONObj>(SnapshotId(), BSON("x" << 1)); + wsm->transitionToOwnedObj(); + qds->pushBack(id); +} - const int nDocs = 500; +// Test the structure and values of the explain output. +TEST_F(QueryStageMultiPlanTest, MPSExplainAllPlans) { + // Insert a document to create the collection. + insert(BSON("x" << 1)); - auto ws = stdx::make_unique<WorkingSet>(); - auto firstPlan = stdx::make_unique<QueuedDataStage>(&_opCtx, ws.get()); - auto secondPlan = stdx::make_unique<QueuedDataStage>(&_opCtx, ws.get()); + const int nDocs = 500; - for (int i = 0; i < nDocs; ++i) { - addMember(firstPlan.get(), ws.get(), BSON("x" << 1)); + auto ws = stdx::make_unique<WorkingSet>(); + auto firstPlan = stdx::make_unique<QueuedDataStage>(_opCtx.get(), ws.get()); + auto secondPlan = stdx::make_unique<QueuedDataStage>(_opCtx.get(), ws.get()); - // Make the second plan slower by inserting a NEED_TIME between every result. - addMember(secondPlan.get(), ws.get(), BSON("x" << 1)); - secondPlan->pushBack(PlanStage::NEED_TIME); - } + for (int i = 0; i < nDocs; ++i) { + addMember(firstPlan.get(), ws.get(), BSON("x" << 1)); - AutoGetCollectionForReadCommand ctx(&_opCtx, nss); - - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(BSON("x" << 1)); - auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); - unique_ptr<MultiPlanStage> mps = - make_unique<MultiPlanStage>(&_opCtx, ctx.getCollection(), cq.get()); - - // Put each plan into the MultiPlanStage. Takes ownership of 'firstPlan' and 'secondPlan'. - auto firstSoln = stdx::make_unique<QuerySolution>(); - auto secondSoln = stdx::make_unique<QuerySolution>(); - mps->addPlan(firstSoln.release(), firstPlan.release(), ws.get()); - mps->addPlan(secondSoln.release(), secondPlan.release(), ws.get()); - - // Making a PlanExecutor chooses the best plan. - auto exec = uassertStatusOK(PlanExecutor::make( - &_opCtx, std::move(ws), std::move(mps), ctx.getCollection(), PlanExecutor::NO_YIELD)); - - auto root = static_cast<MultiPlanStage*>(exec->getRootStage()); - ASSERT_TRUE(root->bestPlanChosen()); - // The first QueuedDataStage should have won. - ASSERT_EQ(root->bestPlanIdx(), 0); - - BSONObjBuilder bob; - Explain::explainStages( - exec.get(), ctx.getCollection(), ExplainOptions::Verbosity::kExecAllPlans, &bob); - BSONObj explained = bob.done(); - - ASSERT_EQ(explained["executionStats"]["nReturned"].Int(), nDocs); - ASSERT_EQ(explained["executionStats"]["executionStages"]["needTime"].Int(), 0); - auto allPlansStats = explained["executionStats"]["allPlansExecution"].Array(); - ASSERT_EQ(allPlansStats.size(), 2UL); - for (auto&& planStats : allPlansStats) { - int maxEvaluationResults = internalQueryPlanEvaluationMaxResults.load(); - ASSERT_EQ(planStats["executionStages"]["stage"].String(), "QUEUED_DATA"); - if (planStats["executionStages"]["needTime"].Int() > 0) { - // This is the losing plan. Should only have advanced about half the time. - ASSERT_LT(planStats["nReturned"].Int(), maxEvaluationResults); - } else { - // This is the winning plan. Stats here should be from the trial period. - ASSERT_EQ(planStats["nReturned"].Int(), maxEvaluationResults); - } - } + // Make the second plan slower by inserting a NEED_TIME between every result. + addMember(secondPlan.get(), ws.get(), BSON("x" << 1)); + secondPlan->pushBack(PlanStage::NEED_TIME); } -private: - /** - * Allocates a new WorkingSetMember with data 'dataObj' in 'ws', and adds the WorkingSetMember - * to 'qds'. - */ - void addMember(QueuedDataStage* qds, WorkingSet* ws, BSONObj dataObj) { - WorkingSetID id = ws->allocate(); - WorkingSetMember* wsm = ws->get(id); - wsm->obj = Snapshotted<BSONObj>(SnapshotId(), BSON("x" << 1)); - wsm->transitionToOwnedObj(); - qds->pushBack(id); + AutoGetCollectionForReadCommand ctx(_opCtx.get(), nss); + + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(BSON("x" << 1)); + auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); + unique_ptr<MultiPlanStage> mps = + make_unique<MultiPlanStage>(_opCtx.get(), ctx.getCollection(), cq.get()); + + // Put each plan into the MultiPlanStage. Takes ownership of 'firstPlan' and 'secondPlan'. + auto firstSoln = stdx::make_unique<QuerySolution>(); + auto secondSoln = stdx::make_unique<QuerySolution>(); + mps->addPlan(firstSoln.release(), firstPlan.release(), ws.get()); + mps->addPlan(secondSoln.release(), secondPlan.release(), ws.get()); + + // Making a PlanExecutor chooses the best plan. + auto exec = uassertStatusOK(PlanExecutor::make( + _opCtx.get(), std::move(ws), std::move(mps), ctx.getCollection(), PlanExecutor::NO_YIELD)); + + auto root = static_cast<MultiPlanStage*>(exec->getRootStage()); + ASSERT_TRUE(root->bestPlanChosen()); + // The first QueuedDataStage should have won. + ASSERT_EQ(root->bestPlanIdx(), 0); + + BSONObjBuilder bob; + Explain::explainStages( + exec.get(), ctx.getCollection(), ExplainOptions::Verbosity::kExecAllPlans, &bob); + BSONObj explained = bob.done(); + + ASSERT_EQ(explained["executionStats"]["nReturned"].Int(), nDocs); + ASSERT_EQ(explained["executionStats"]["executionStages"]["needTime"].Int(), 0); + auto allPlansStats = explained["executionStats"]["allPlansExecution"].Array(); + ASSERT_EQ(allPlansStats.size(), 2UL); + for (auto&& planStats : allPlansStats) { + int maxEvaluationResults = internalQueryPlanEvaluationMaxResults.load(); + ASSERT_EQ(planStats["executionStages"]["stage"].String(), "QUEUED_DATA"); + if (planStats["executionStages"]["needTime"].Int() > 0) { + // This is the losing plan. Should only have advanced about half the time. + ASSERT_LT(planStats["nReturned"].Int(), maxEvaluationResults); + } else { + // This is the winning plan. Stats here should be from the trial period. + ASSERT_EQ(planStats["nReturned"].Int(), maxEvaluationResults); + } } -}; +} // Test that the plan summary only includes stats from the winning plan. // // This is a regression test for SERVER-20111. -class MPSSummaryStats : public QueryStageMultiPlanBase { -public: - void run() { - const int N = 5000; - for (int i = 0; i < N; ++i) { - insert(BSON("foo" << (i % 10))); - } +TEST_F(QueryStageMultiPlanTest, MPSSummaryStats) { + const int N = 5000; + for (int i = 0; i < N; ++i) { + insert(BSON("foo" << (i % 10))); + } - // Add two indices to give more plans. - addIndex(BSON("foo" << 1)); - addIndex(BSON("foo" << -1 << "bar" << 1)); + // Add two indices to give more plans. + addIndex(BSON("foo" << 1)); + addIndex(BSON("foo" << -1 << "bar" << 1)); - AutoGetCollectionForReadCommand ctx(&_opCtx, nss); - Collection* coll = ctx.getCollection(); + AutoGetCollectionForReadCommand ctx(_opCtx.get(), nss); + Collection* coll = ctx.getCollection(); - // Create the executor (Matching all documents). - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(BSON("foo" << BSON("$gte" << 0))); - auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); - auto exec = - uassertStatusOK(getExecutor(&_opCtx, coll, std::move(cq), PlanExecutor::NO_YIELD)); + // Create the executor (Matching all documents). + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(BSON("foo" << BSON("$gte" << 0))); + auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); + auto exec = uassertStatusOK(getExecutor(opCtx(), coll, std::move(cq), PlanExecutor::NO_YIELD)); + ASSERT_EQ(exec->getRootStage()->stageType(), STAGE_MULTI_PLAN); - ASSERT_EQ(exec->getRootStage()->stageType(), STAGE_MULTI_PLAN); + ASSERT_OK(exec->executePlan()); - exec->executePlan().transitional_ignore(); + PlanSummaryStats stats; + Explain::getSummaryStats(*exec, &stats); - PlanSummaryStats stats; - Explain::getSummaryStats(*exec, &stats); + // If only the winning plan's stats are recorded, we should not have examined more than the + // total number of documents/index keys. + ASSERT_LTE(stats.totalDocsExamined, static_cast<size_t>(N)); + ASSERT_LTE(stats.totalKeysExamined, static_cast<size_t>(N)); +} - // If only the winning plan's stats are recorded, we should not have examined more than the - // total number of documents/index keys. - ASSERT_LTE(stats.totalDocsExamined, static_cast<size_t>(N)); - ASSERT_LTE(stats.totalKeysExamined, static_cast<size_t>(N)); +TEST_F(QueryStageMultiPlanTest, ShouldReportErrorIfExceedsTimeLimitDuringPlanning) { + const int N = 5000; + for (int i = 0; i < N; ++i) { + insert(BSON("foo" << (i % 10))); } -}; -class All : public Suite { -public: - All() : Suite("query_stage_multiplan") {} + // Add two indices to give more plans. + addIndex(BSON("foo" << 1)); + addIndex(BSON("foo" << -1 << "bar" << 1)); + + AutoGetCollectionForReadCommand ctx(_opCtx.get(), nss); + const auto coll = ctx.getCollection(); + + // Plan 0: IXScan over foo == 7 + // Every call to work() returns something so this should clearly win (by current scoring + // at least). + std::vector<IndexDescriptor*> indexes; + coll->getIndexCatalog()->findIndexesByKeyPattern( + _opCtx.get(), BSON("foo" << 1), false, &indexes); + ASSERT_EQ(indexes.size(), 1U); + + IndexScanParams ixparams; + ixparams.descriptor = indexes[0]; + ixparams.bounds.isSimpleRange = true; + ixparams.bounds.startKey = BSON("" << 7); + ixparams.bounds.endKey = BSON("" << 7); + ixparams.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys; + ixparams.direction = 1; + + unique_ptr<WorkingSet> sharedWs(new WorkingSet()); + IndexScan* ix = new IndexScan(_opCtx.get(), ixparams, sharedWs.get(), NULL); + unique_ptr<PlanStage> firstRoot(new FetchStage(_opCtx.get(), sharedWs.get(), ix, NULL, coll)); + + // Plan 1: CollScan with matcher. + CollectionScanParams csparams; + csparams.collection = coll; + csparams.direction = CollectionScanParams::FORWARD; + + // Make the filter. + BSONObj filterObj = BSON("foo" << 7); + const CollatorInterface* collator = nullptr; + StatusWithMatchExpression statusWithMatcher = MatchExpressionParser::parse(filterObj, collator); + verify(statusWithMatcher.isOK()); + unique_ptr<MatchExpression> filter = std::move(statusWithMatcher.getValue()); + // Make the stage. + unique_ptr<PlanStage> secondRoot( + new CollectionScan(_opCtx.get(), csparams, sharedWs.get(), filter.get())); + + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(BSON("foo" << 7)); + auto canonicalQuery = + uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest))); + MultiPlanStage multiPlanStage(opCtx(), + ctx.getCollection(), + canonicalQuery.get(), + MultiPlanStage::CachingMode::NeverCache); + multiPlanStage.addPlan(createQuerySolution(), firstRoot.release(), sharedWs.get()); + multiPlanStage.addPlan(createQuerySolution(), secondRoot.release(), sharedWs.get()); + + AlwaysTimeOutYieldPolicy alwaysTimeOutPolicy(serviceContext()->getFastClockSource()); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, multiPlanStage.pickBestPlan(&alwaysTimeOutPolicy)); +} - void setupTests() { - add<MPSCollectionScanVsHighlySelectiveIXScan>(); - add<MPSBackupPlan>(); - add<MPSExplainAllPlans>(); - add<MPSSummaryStats>(); +TEST_F(QueryStageMultiPlanTest, ShouldReportErrorIfKilledDuringPlanning) { + const int N = 5000; + for (int i = 0; i < N; ++i) { + insert(BSON("foo" << (i % 10))); } -}; -SuiteInstance<All> queryStageMultiPlanAll; + // Add two indices to give more plans. + addIndex(BSON("foo" << 1)); + addIndex(BSON("foo" << -1 << "bar" << 1)); + + AutoGetCollectionForReadCommand ctx(_opCtx.get(), nss); + const auto coll = ctx.getCollection(); + + // Plan 0: IXScan over foo == 7 + // Every call to work() returns something so this should clearly win (by current scoring + // at least). + std::vector<IndexDescriptor*> indexes; + coll->getIndexCatalog()->findIndexesByKeyPattern( + _opCtx.get(), BSON("foo" << 1), false, &indexes); + ASSERT_EQ(indexes.size(), 1U); + + IndexScanParams ixparams; + ixparams.descriptor = indexes[0]; + ixparams.bounds.isSimpleRange = true; + ixparams.bounds.startKey = BSON("" << 7); + ixparams.bounds.endKey = BSON("" << 7); + ixparams.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys; + ixparams.direction = 1; + + unique_ptr<WorkingSet> sharedWs(new WorkingSet()); + IndexScan* ix = new IndexScan(_opCtx.get(), ixparams, sharedWs.get(), NULL); + unique_ptr<PlanStage> firstRoot(new FetchStage(_opCtx.get(), sharedWs.get(), ix, NULL, coll)); + + // Plan 1: CollScan with matcher. + CollectionScanParams csparams; + csparams.collection = coll; + csparams.direction = CollectionScanParams::FORWARD; + + // Make the filter. + BSONObj filterObj = BSON("foo" << 7); + const CollatorInterface* collator = nullptr; + StatusWithMatchExpression statusWithMatcher = MatchExpressionParser::parse(filterObj, collator); + verify(statusWithMatcher.isOK()); + unique_ptr<MatchExpression> filter = std::move(statusWithMatcher.getValue()); + // Make the stage. + unique_ptr<PlanStage> secondRoot( + new CollectionScan(_opCtx.get(), csparams, sharedWs.get(), filter.get())); + + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(BSON("foo" << BSON("$gte" << 0))); + auto canonicalQuery = + uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest))); + MultiPlanStage multiPlanStage(opCtx(), + ctx.getCollection(), + canonicalQuery.get(), + MultiPlanStage::CachingMode::NeverCache); + multiPlanStage.addPlan(createQuerySolution(), firstRoot.release(), sharedWs.get()); + multiPlanStage.addPlan(createQuerySolution(), secondRoot.release(), sharedWs.get()); + + AlwaysPlanKilledYieldPolicy alwaysPlanKilledYieldPolicy(serviceContext()->getFastClockSource()); + ASSERT_EQ(ErrorCodes::QueryPlanKilled, + multiPlanStage.pickBestPlan(&alwaysPlanKilledYieldPolicy)); +} -} // namespace QueryStageMultiPlan +} // namespace +} // namespace mongo diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index 2c698748b6c..6da81f409a7 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -365,7 +365,7 @@ public: coll->updateDocument(&_opCtx, *it, oldDoc, newDoc(oldDoc), false, false, NULL, &args); wuow.commit(); } - exec->restoreState(); + ASSERT_OK(exec->restoreState()); // Read the rest of the data from the queued data stage. while (!queuedDataStage->isEOF()) { @@ -385,7 +385,7 @@ public: wuow.commit(); } } - exec->restoreState(); + ASSERT_OK(exec->restoreState()); // Verify that it's sorted, the right number of documents are returned, and they're all // in the expected range. @@ -465,7 +465,7 @@ public: coll->deleteDocument(&_opCtx, kUninitializedStmtId, *it++, nullOpDebug); wuow.commit(); } - exec->restoreState(); + ASSERT_OK(exec->restoreState()); // Read the rest of the data from the queued data stage. while (!queuedDataStage->isEOF()) { @@ -482,7 +482,7 @@ public: wuow.commit(); } } - exec->restoreState(); + ASSERT_OK(exec->restoreState()); // Regardless of storage engine, all the documents should come back with their objects int count = 0; diff --git a/src/mongo/dbtests/query_stage_subplan.cpp b/src/mongo/dbtests/query_stage_subplan.cpp index bca6559c25e..3b2e474e35a 100644 --- a/src/mongo/dbtests/query_stage_subplan.cpp +++ b/src/mongo/dbtests/query_stage_subplan.cpp @@ -32,6 +32,7 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" @@ -42,23 +43,27 @@ #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/mock_yield_policies.h" +#include "mongo/db/query/query_test_service_context.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/util/assert_util.h" -namespace QueryStageSubplan { +namespace mongo { +namespace { static const NamespaceString nss("unittests.QueryStageSubplan"); -class QueryStageSubplanBase { +class QueryStageSubplanTest : public unittest::Test { public: - QueryStageSubplanBase() : _client(&_opCtx) {} + QueryStageSubplanTest() : _client(_opCtx.get()) {} - virtual ~QueryStageSubplanBase() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); + virtual ~QueryStageSubplanTest() { + OldClientWriteContext ctx(opCtx(), nss.ns()); _client.dropCollection(nss.ns()); } void addIndex(const BSONObj& obj) { - ASSERT_OK(dbtests::createIndex(&_opCtx, nss.ns(), obj)); + ASSERT_OK(dbtests::createIndex(opCtx(), nss.ns(), obj)); } void insert(const BSONObj& doc) { @@ -66,7 +71,11 @@ public: } OperationContext* opCtx() { - return &_opCtx; + return _opCtx.get(); + } + + ServiceContext* serviceContext() { + return _opCtx->getServiceContext(); } protected: @@ -89,9 +98,8 @@ protected: return cq; } - const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext(); - OperationContext& _opCtx = *_txnPtr; - ClockSource* _clock = _opCtx.getServiceContext()->getFastClockSource(); + const ServiceContext::UniqueOperationContext _opCtx = cc().makeOperationContext(); + ClockSource* _clock = _opCtx->getServiceContext()->getFastClockSource(); private: DBDirectClient _client; @@ -103,542 +111,559 @@ private: * should gracefully fail after finding that no cache data is available, allowing us to fall * back to regular planning. */ -class QueryStageSubplanGeo2dOr : public QueryStageSubplanBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - addIndex(BSON("a" - << "2d" - << "b" - << 1)); - addIndex(BSON("a" - << "2d")); - - BSONObj query = fromjson( - "{$or: [{a: {$geoWithin: {$centerSphere: [[0,0],10]}}}," - "{a: {$geoWithin: {$centerSphere: [[1,1],10]}}}]}"); - - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(query); - auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); - ASSERT_OK(statusWithCQ.getStatus()); - std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - - Collection* collection = ctx.getCollection(); - - // Get planner params. - QueryPlannerParams plannerParams; - fillOutPlannerParams(&_opCtx, collection, cq.get(), &plannerParams); - - WorkingSet ws; - std::unique_ptr<SubplanStage> subplan( - new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); - - // Plan selection should succeed due to falling back on regular planning. - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - } -}; +TEST_F(QueryStageSubplanTest, QueryStageSubplanGeo2dOr) { + OldClientWriteContext ctx(opCtx(), nss.ns()); + addIndex(BSON("a" + << "2d" + << "b" + << 1)); + addIndex(BSON("a" + << "2d")); + + BSONObj query = fromjson( + "{$or: [{a: {$geoWithin: {$centerSphere: [[0,0],10]}}}," + "{a: {$geoWithin: {$centerSphere: [[1,1],10]}}}]}"); + + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(query); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); + ASSERT_OK(statusWithCQ.getStatus()); + std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + + Collection* collection = ctx.getCollection(); + + // Get planner params. + QueryPlannerParams plannerParams; + fillOutPlannerParams(opCtx(), collection, cq.get(), &plannerParams); + + WorkingSet ws; + std::unique_ptr<SubplanStage> subplan( + new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); + + // Plan selection should succeed due to falling back on regular planning. + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); +} /** * Test the SubplanStage's ability to plan an individual branch using the plan cache. */ -class QueryStageSubplanPlanFromCache : public QueryStageSubplanBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); +TEST_F(QueryStageSubplanTest, QueryStageSubplanPlanFromCache) { + OldClientWriteContext ctx(opCtx(), nss.ns()); - addIndex(BSON("a" << 1)); - addIndex(BSON("a" << 1 << "b" << 1)); - addIndex(BSON("c" << 1)); + addIndex(BSON("a" << 1)); + addIndex(BSON("a" << 1 << "b" << 1)); + addIndex(BSON("c" << 1)); - for (int i = 0; i < 10; i++) { - insert(BSON("a" << 1 << "b" << i << "c" << i)); - } + for (int i = 0; i < 10; i++) { + insert(BSON("a" << 1 << "b" << i << "c" << i)); + } - // This query should result in a plan cache entry for the first $or branch, because - // there are two competing indices. The second branch has only one relevant index, so - // its winning plan should not be cached. - BSONObj query = fromjson("{$or: [{a: 1, b: 3}, {c: 1}]}"); + // This query should result in a plan cache entry for the first $or branch, because + // there are two competing indices. The second branch has only one relevant index, so + // its winning plan should not be cached. + BSONObj query = fromjson("{$or: [{a: 1, b: 3}, {c: 1}]}"); - Collection* collection = ctx.getCollection(); + Collection* collection = ctx.getCollection(); - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(query); - auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); - ASSERT_OK(statusWithCQ.getStatus()); - std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(query); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); + ASSERT_OK(statusWithCQ.getStatus()); + std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - // Get planner params. - QueryPlannerParams plannerParams; - fillOutPlannerParams(&_opCtx, collection, cq.get(), &plannerParams); + // Get planner params. + QueryPlannerParams plannerParams; + fillOutPlannerParams(opCtx(), collection, cq.get(), &plannerParams); - WorkingSet ws; - std::unique_ptr<SubplanStage> subplan( - new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); + WorkingSet ws; + std::unique_ptr<SubplanStage> subplan( + new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - // Nothing is in the cache yet, so neither branch should have been planned from - // the plan cache. - ASSERT_FALSE(subplan->branchPlannedFromCache(0)); - ASSERT_FALSE(subplan->branchPlannedFromCache(1)); + // Nothing is in the cache yet, so neither branch should have been planned from + // the plan cache. + ASSERT_FALSE(subplan->branchPlannedFromCache(0)); + ASSERT_FALSE(subplan->branchPlannedFromCache(1)); - // If we repeat the same query, the plan for the first branch should have come from - // the cache. - ws.clear(); - subplan.reset(new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); + // If we repeat the same query, the plan for the first branch should have come from + // the cache. + ws.clear(); + subplan.reset(new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - ASSERT_TRUE(subplan->branchPlannedFromCache(0)); - ASSERT_FALSE(subplan->branchPlannedFromCache(1)); - } -}; + ASSERT_TRUE(subplan->branchPlannedFromCache(0)); + ASSERT_FALSE(subplan->branchPlannedFromCache(1)); +} /** * Ensure that the subplan stage doesn't create a plan cache entry if there are no query results. */ -class QueryStageSubplanDontCacheZeroResults : public QueryStageSubplanBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); +TEST_F(QueryStageSubplanTest, QueryStageSubplanDontCacheZeroResults) { + OldClientWriteContext ctx(opCtx(), nss.ns()); - addIndex(BSON("a" << 1 << "b" << 1)); - addIndex(BSON("a" << 1)); - addIndex(BSON("c" << 1)); + addIndex(BSON("a" << 1 << "b" << 1)); + addIndex(BSON("a" << 1)); + addIndex(BSON("c" << 1)); - for (int i = 0; i < 10; i++) { - insert(BSON("a" << 1 << "b" << i << "c" << i)); - } + for (int i = 0; i < 10; i++) { + insert(BSON("a" << 1 << "b" << i << "c" << i)); + } - // Running this query should not create any cache entries. For the first branch, it's - // because there are no matching results. For the second branch it's because there is only - // one relevant index. - BSONObj query = fromjson("{$or: [{a: 1, b: 15}, {c: 1}]}"); + // Running this query should not create any cache entries. For the first branch, it's + // because there are no matching results. For the second branch it's because there is only + // one relevant index. + BSONObj query = fromjson("{$or: [{a: 1, b: 15}, {c: 1}]}"); - Collection* collection = ctx.getCollection(); + Collection* collection = ctx.getCollection(); - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(query); - auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); - ASSERT_OK(statusWithCQ.getStatus()); - std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(query); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); + ASSERT_OK(statusWithCQ.getStatus()); + std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - // Get planner params. - QueryPlannerParams plannerParams; - fillOutPlannerParams(&_opCtx, collection, cq.get(), &plannerParams); + // Get planner params. + QueryPlannerParams plannerParams; + fillOutPlannerParams(opCtx(), collection, cq.get(), &plannerParams); - WorkingSet ws; - std::unique_ptr<SubplanStage> subplan( - new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); + WorkingSet ws; + std::unique_ptr<SubplanStage> subplan( + new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - // Nothing is in the cache yet, so neither branch should have been planned from - // the plan cache. - ASSERT_FALSE(subplan->branchPlannedFromCache(0)); - ASSERT_FALSE(subplan->branchPlannedFromCache(1)); + // Nothing is in the cache yet, so neither branch should have been planned from + // the plan cache. + ASSERT_FALSE(subplan->branchPlannedFromCache(0)); + ASSERT_FALSE(subplan->branchPlannedFromCache(1)); - // If we run the query again, it should again be the case that neither branch gets planned - // from the cache (because the first call to pickBestPlan() refrained from creating any - // cache entries). - ws.clear(); - subplan.reset(new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); + // If we run the query again, it should again be the case that neither branch gets planned + // from the cache (because the first call to pickBestPlan() refrained from creating any + // cache entries). + ws.clear(); + subplan.reset(new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - ASSERT_FALSE(subplan->branchPlannedFromCache(0)); - ASSERT_FALSE(subplan->branchPlannedFromCache(1)); - } -}; + ASSERT_FALSE(subplan->branchPlannedFromCache(0)); + ASSERT_FALSE(subplan->branchPlannedFromCache(1)); +} /** * Ensure that the subplan stage doesn't create a plan cache entry if there are no query results. */ -class QueryStageSubplanDontCacheTies : public QueryStageSubplanBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); +TEST_F(QueryStageSubplanTest, QueryStageSubplanDontCacheTies) { + OldClientWriteContext ctx(opCtx(), nss.ns()); - addIndex(BSON("a" << 1 << "b" << 1)); - addIndex(BSON("a" << 1 << "c" << 1)); - addIndex(BSON("d" << 1)); + addIndex(BSON("a" << 1 << "b" << 1)); + addIndex(BSON("a" << 1 << "c" << 1)); + addIndex(BSON("d" << 1)); - for (int i = 0; i < 10; i++) { - insert(BSON("a" << 1 << "e" << 1 << "d" << 1)); - } + for (int i = 0; i < 10; i++) { + insert(BSON("a" << 1 << "e" << 1 << "d" << 1)); + } - // Running this query should not create any cache entries. For the first branch, it's - // because plans using the {a: 1, b: 1} and {a: 1, c: 1} indices should tie during plan - // ranking. For the second branch it's because there is only one relevant index. - BSONObj query = fromjson("{$or: [{a: 1, e: 1}, {d: 1}]}"); + // Running this query should not create any cache entries. For the first branch, it's + // because plans using the {a: 1, b: 1} and {a: 1, c: 1} indices should tie during plan + // ranking. For the second branch it's because there is only one relevant index. + BSONObj query = fromjson("{$or: [{a: 1, e: 1}, {d: 1}]}"); - Collection* collection = ctx.getCollection(); + Collection* collection = ctx.getCollection(); - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(query); - auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); - ASSERT_OK(statusWithCQ.getStatus()); - std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(query); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx(), std::move(qr)); + ASSERT_OK(statusWithCQ.getStatus()); + std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - // Get planner params. - QueryPlannerParams plannerParams; - fillOutPlannerParams(&_opCtx, collection, cq.get(), &plannerParams); + // Get planner params. + QueryPlannerParams plannerParams; + fillOutPlannerParams(opCtx(), collection, cq.get(), &plannerParams); - WorkingSet ws; - std::unique_ptr<SubplanStage> subplan( - new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); + WorkingSet ws; + std::unique_ptr<SubplanStage> subplan( + new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - // Nothing is in the cache yet, so neither branch should have been planned from - // the plan cache. - ASSERT_FALSE(subplan->branchPlannedFromCache(0)); - ASSERT_FALSE(subplan->branchPlannedFromCache(1)); + // Nothing is in the cache yet, so neither branch should have been planned from + // the plan cache. + ASSERT_FALSE(subplan->branchPlannedFromCache(0)); + ASSERT_FALSE(subplan->branchPlannedFromCache(1)); - // If we run the query again, it should again be the case that neither branch gets planned - // from the cache (because the first call to pickBestPlan() refrained from creating any - // cache entries). - ws.clear(); - subplan.reset(new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); + // If we run the query again, it should again be the case that neither branch gets planned + // from the cache (because the first call to pickBestPlan() refrained from creating any + // cache entries). + ws.clear(); + subplan.reset(new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - ASSERT_FALSE(subplan->branchPlannedFromCache(0)); - ASSERT_FALSE(subplan->branchPlannedFromCache(1)); - } -}; + ASSERT_FALSE(subplan->branchPlannedFromCache(0)); + ASSERT_FALSE(subplan->branchPlannedFromCache(1)); +} /** * Unit test the subplan stage's canUseSubplanning() method. */ -class QueryStageSubplanCanUseSubplanning : public QueryStageSubplanBase { -public: - void run() { - // We won't try and subplan something that doesn't have an $or. - { - std::string findCmd = "{find: 'testns', filter: {$and:[{a:1}, {b:1}]}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } +TEST_F(QueryStageSubplanTest, QueryStageSubplanCanUseSubplanning) { + // We won't try and subplan something that doesn't have an $or. + { + std::string findCmd = "{find: 'testns', filter: {$and:[{a:1}, {b:1}]}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Don't try and subplan if there is no filter. - { - std::string findCmd = "{find: 'testns'}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Don't try and subplan if there is no filter. + { + std::string findCmd = "{find: 'testns'}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // We won't try and subplan two contained ORs. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$or:[{a:1}, {b:1}], $or:[{c:1}, {d:1}], e:1}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // We won't try and subplan two contained ORs. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$or:[{a:1}, {b:1}], $or:[{c:1}, {d:1}], e:1}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning if there is a hint. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," - "hint: {a:1, b:1}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning if there is a hint. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," + "hint: {a:1, b:1}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning with min. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," - "min: {a:1, b:1}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning with min. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," + "min: {a:1, b:1}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning with max. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," - "max: {a:2, b:2}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning with max. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," + "max: {a:2, b:2}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning with tailable. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," - "tailable: true}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning with tailable. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," + "tailable: true}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning with snapshot. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," - "snapshot: true}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning with snapshot. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}," + "snapshot: true}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can use subplanning for rooted $or. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_TRUE(SubplanStage::canUseSubplanning(*cq)); - - std::string findCmd2 = - "{find: 'testns'," - "filter: {$or: [{a:1}, {c:1}]}}"; - std::unique_ptr<CanonicalQuery> cq2 = cqFromFindCommand(findCmd2); - ASSERT_TRUE(SubplanStage::canUseSubplanning(*cq2)); - } + // Can use subplanning for rooted $or. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$or: [{a:1, b:1}, {c:1, d:1}]}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_TRUE(SubplanStage::canUseSubplanning(*cq)); + + std::string findCmd2 = + "{find: 'testns'," + "filter: {$or: [{a:1}, {c:1}]}}"; + std::unique_ptr<CanonicalQuery> cq2 = cqFromFindCommand(findCmd2); + ASSERT_TRUE(SubplanStage::canUseSubplanning(*cq2)); + } - // Can't use subplanning for a single contained $or. - // - // TODO: Consider allowing this to use subplanning (see SERVER-13732). - { - std::string findCmd = - "{find: 'testns'," - "filter: {e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning for a single contained $or. + // + // TODO: Consider allowing this to use subplanning (see SERVER-13732). + { + std::string findCmd = + "{find: 'testns'," + "filter: {e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning if the contained $or query has a geo predicate. - // - // TODO: Consider allowing this to use subplanning (see SERVER-13732). - { - std::string findCmd = - "{find: 'testns'," - "filter: {loc: {$geoWithin: {$centerSphere: [[0,0], 1]}}," - "e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning if the contained $or query has a geo predicate. + // + // TODO: Consider allowing this to use subplanning (see SERVER-13732). + { + std::string findCmd = + "{find: 'testns'," + "filter: {loc: {$geoWithin: {$centerSphere: [[0,0], 1]}}," + "e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning if the contained $or query also has a $text predicate. - { - std::string findCmd = - "{find: 'testns'," - "filter: {$text: {$search: 'foo'}," - "e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning if the contained $or query also has a $text predicate. + { + std::string findCmd = + "{find: 'testns'," + "filter: {$text: {$search: 'foo'}," + "e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); + } - // Can't use subplanning if the contained $or query also has a $near predicate. - { - std::string findCmd = - "{find: 'testns'," - "filter: {loc: {$near: [0, 0]}," - "e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; - std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); - ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); - } + // Can't use subplanning if the contained $or query also has a $near predicate. + { + std::string findCmd = + "{find: 'testns'," + "filter: {loc: {$near: [0, 0]}," + "e: 1, $or: [{a:1, b:1}, {c:1, d:1}]}}"; + std::unique_ptr<CanonicalQuery> cq = cqFromFindCommand(findCmd); + ASSERT_FALSE(SubplanStage::canUseSubplanning(*cq)); } -}; +} /** * Unit test the subplan stage's rewriteToRootedOr() method. */ -class QueryStageSubplanRewriteToRootedOr : public QueryStageSubplanBase { -public: - void run() { - // Rewrite (AND (OR a b) e) => (OR (AND a e) (AND b e)) - { - BSONObj queryObj = fromjson("{$or:[{a:1}, {b:1}], e:1}"); - const CollatorInterface* collator = nullptr; - StatusWithMatchExpression expr = MatchExpressionParser::parse(queryObj, collator); - ASSERT_OK(expr.getStatus()); - std::unique_ptr<MatchExpression> rewrittenExpr = - SubplanStage::rewriteToRootedOr(std::move(expr.getValue())); - - std::string findCmdRewritten = - "{find: 'testns'," - "filter: {$or:[{a:1,e:1}, {b:1,e:1}]}}"; - std::unique_ptr<CanonicalQuery> cqRewritten = cqFromFindCommand(findCmdRewritten); - - ASSERT(rewrittenExpr->equivalent(cqRewritten->root())); - } +TEST_F(QueryStageSubplanTest, QueryStageSubplanRewriteToRootedOr) { + // Rewrite (AND (OR a b) e) => (OR (AND a e) (AND b e)) + { + BSONObj queryObj = fromjson("{$or:[{a:1}, {b:1}], e:1}"); + const CollatorInterface* collator = nullptr; + StatusWithMatchExpression expr = MatchExpressionParser::parse(queryObj, collator); + ASSERT_OK(expr.getStatus()); + std::unique_ptr<MatchExpression> rewrittenExpr = + SubplanStage::rewriteToRootedOr(std::move(expr.getValue())); + + std::string findCmdRewritten = + "{find: 'testns'," + "filter: {$or:[{a:1,e:1}, {b:1,e:1}]}}"; + std::unique_ptr<CanonicalQuery> cqRewritten = cqFromFindCommand(findCmdRewritten); + + ASSERT(rewrittenExpr->equivalent(cqRewritten->root())); + } - // Rewrite (AND (OR a b) e f) => (OR (AND a e f) (AND b e f)) - { - BSONObj queryObj = fromjson("{$or:[{a:1}, {b:1}], e:1, f:1}"); - const CollatorInterface* collator = nullptr; - StatusWithMatchExpression expr = MatchExpressionParser::parse(queryObj, collator); - ASSERT_OK(expr.getStatus()); - std::unique_ptr<MatchExpression> rewrittenExpr = - SubplanStage::rewriteToRootedOr(std::move(expr.getValue())); - - std::string findCmdRewritten = - "{find: 'testns'," - "filter: {$or:[{a:1,e:1,f:1}, {b:1,e:1,f:1}]}}"; - std::unique_ptr<CanonicalQuery> cqRewritten = cqFromFindCommand(findCmdRewritten); - - ASSERT(rewrittenExpr->equivalent(cqRewritten->root())); - } + // Rewrite (AND (OR a b) e f) => (OR (AND a e f) (AND b e f)) + { + BSONObj queryObj = fromjson("{$or:[{a:1}, {b:1}], e:1, f:1}"); + const CollatorInterface* collator = nullptr; + StatusWithMatchExpression expr = MatchExpressionParser::parse(queryObj, collator); + ASSERT_OK(expr.getStatus()); + std::unique_ptr<MatchExpression> rewrittenExpr = + SubplanStage::rewriteToRootedOr(std::move(expr.getValue())); + + std::string findCmdRewritten = + "{find: 'testns'," + "filter: {$or:[{a:1,e:1,f:1}, {b:1,e:1,f:1}]}}"; + std::unique_ptr<CanonicalQuery> cqRewritten = cqFromFindCommand(findCmdRewritten); + + ASSERT(rewrittenExpr->equivalent(cqRewritten->root())); + } - // Rewrite (AND (OR (AND a b) (AND c d) e f) => (OR (AND a b e f) (AND c d e f)) - { - BSONObj queryObj = fromjson("{$or:[{a:1,b:1}, {c:1,d:1}], e:1,f:1}"); - const CollatorInterface* collator = nullptr; - StatusWithMatchExpression expr = MatchExpressionParser::parse(queryObj, collator); - ASSERT_OK(expr.getStatus()); - std::unique_ptr<MatchExpression> rewrittenExpr = - SubplanStage::rewriteToRootedOr(std::move(expr.getValue())); - - std::string findCmdRewritten = - "{find: 'testns'," - "filter: {$or:[{a:1,b:1,e:1,f:1}," - "{c:1,d:1,e:1,f:1}]}}"; - std::unique_ptr<CanonicalQuery> cqRewritten = cqFromFindCommand(findCmdRewritten); - - ASSERT(rewrittenExpr->equivalent(cqRewritten->root())); - } + // Rewrite (AND (OR (AND a b) (AND c d) e f) => (OR (AND a b e f) (AND c d e f)) + { + BSONObj queryObj = fromjson("{$or:[{a:1,b:1}, {c:1,d:1}], e:1,f:1}"); + const CollatorInterface* collator = nullptr; + StatusWithMatchExpression expr = MatchExpressionParser::parse(queryObj, collator); + ASSERT_OK(expr.getStatus()); + std::unique_ptr<MatchExpression> rewrittenExpr = + SubplanStage::rewriteToRootedOr(std::move(expr.getValue())); + + std::string findCmdRewritten = + "{find: 'testns'," + "filter: {$or:[{a:1,b:1,e:1,f:1}," + "{c:1,d:1,e:1,f:1}]}}"; + std::unique_ptr<CanonicalQuery> cqRewritten = cqFromFindCommand(findCmdRewritten); + + ASSERT(rewrittenExpr->equivalent(cqRewritten->root())); } -}; +} /** * Test the subplan stage's ability to answer a contained $or query. */ -class QueryStageSubplanPlanContainedOr : public QueryStageSubplanBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - addIndex(BSON("b" << 1 << "a" << 1)); - addIndex(BSON("c" << 1 << "a" << 1)); - - BSONObj query = fromjson("{a: 1, $or: [{b: 2}, {c: 3}]}"); - - // Two of these documents match. - insert(BSON("_id" << 1 << "a" << 1 << "b" << 2)); - insert(BSON("_id" << 2 << "a" << 2 << "b" << 2)); - insert(BSON("_id" << 3 << "a" << 1 << "c" << 3)); - insert(BSON("_id" << 4 << "a" << 1 << "c" << 4)); - - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(query); - auto cq = unittest::assertGet(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); - - Collection* collection = ctx.getCollection(); - - // Get planner params. - QueryPlannerParams plannerParams; - fillOutPlannerParams(&_opCtx, collection, cq.get(), &plannerParams); - - WorkingSet ws; - std::unique_ptr<SubplanStage> subplan( - new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); - - // Plan selection should succeed due to falling back on regular planning. - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - - // Work the stage until it produces all results. - size_t numResults = 0; - PlanStage::StageState stageState = PlanStage::NEED_TIME; - while (stageState != PlanStage::IS_EOF) { - WorkingSetID id = WorkingSet::INVALID_ID; - stageState = subplan->work(&id); - ASSERT_NE(stageState, PlanStage::DEAD); - ASSERT_NE(stageState, PlanStage::FAILURE); - - if (stageState == PlanStage::ADVANCED) { - ++numResults; - WorkingSetMember* member = ws.get(id); - ASSERT(member->hasObj()); - ASSERT(SimpleBSONObjComparator::kInstance.evaluate( - member->obj.value() == BSON("_id" << 1 << "a" << 1 << "b" << 2)) || - SimpleBSONObjComparator::kInstance.evaluate( - member->obj.value() == BSON("_id" << 3 << "a" << 1 << "c" << 3))); - } +TEST_F(QueryStageSubplanTest, QueryStageSubplanPlanContainedOr) { + OldClientWriteContext ctx(opCtx(), nss.ns()); + addIndex(BSON("b" << 1 << "a" << 1)); + addIndex(BSON("c" << 1 << "a" << 1)); + + BSONObj query = fromjson("{a: 1, $or: [{b: 2}, {c: 3}]}"); + + // Two of these documents match. + insert(BSON("_id" << 1 << "a" << 1 << "b" << 2)); + insert(BSON("_id" << 2 << "a" << 2 << "b" << 2)); + insert(BSON("_id" << 3 << "a" << 1 << "c" << 3)); + insert(BSON("_id" << 4 << "a" << 1 << "c" << 4)); + + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(query); + auto cq = unittest::assertGet(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); + + Collection* collection = ctx.getCollection(); + + // Get planner params. + QueryPlannerParams plannerParams; + fillOutPlannerParams(opCtx(), collection, cq.get(), &plannerParams); + + WorkingSet ws; + std::unique_ptr<SubplanStage> subplan( + new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); + + // Plan selection should succeed due to falling back on regular planning. + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + + // Work the stage until it produces all results. + size_t numResults = 0; + PlanStage::StageState stageState = PlanStage::NEED_TIME; + while (stageState != PlanStage::IS_EOF) { + WorkingSetID id = WorkingSet::INVALID_ID; + stageState = subplan->work(&id); + ASSERT_NE(stageState, PlanStage::DEAD); + ASSERT_NE(stageState, PlanStage::FAILURE); + + if (stageState == PlanStage::ADVANCED) { + ++numResults; + WorkingSetMember* member = ws.get(id); + ASSERT(member->hasObj()); + ASSERT(SimpleBSONObjComparator::kInstance.evaluate( + member->obj.value() == BSON("_id" << 1 << "a" << 1 << "b" << 2)) || + SimpleBSONObjComparator::kInstance.evaluate( + member->obj.value() == BSON("_id" << 3 << "a" << 1 << "c" << 3))); } - - ASSERT_EQ(numResults, 2U); } -}; + + ASSERT_EQ(numResults, 2U); +} /** * Test the subplan stage's ability to answer a rooted $or query with a $ne and a sort. * * Regression test for SERVER-19388. */ -class QueryStageSubplanPlanRootedOrNE : public QueryStageSubplanBase { -public: - void run() { - OldClientWriteContext ctx(&_opCtx, nss.ns()); - addIndex(BSON("a" << 1 << "b" << 1)); - addIndex(BSON("a" << 1 << "c" << 1)); - - // Every doc matches. - insert(BSON("_id" << 1 << "a" << 1)); - insert(BSON("_id" << 2 << "a" << 2)); - insert(BSON("_id" << 3 << "a" << 3)); - insert(BSON("_id" << 4)); - - auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setFilter(fromjson("{$or: [{a: 1}, {a: {$ne:1}}]}")); - qr->setSort(BSON("d" << 1)); - auto cq = unittest::assertGet(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); - - Collection* collection = ctx.getCollection(); - - QueryPlannerParams plannerParams; - fillOutPlannerParams(&_opCtx, collection, cq.get(), &plannerParams); - - WorkingSet ws; - std::unique_ptr<SubplanStage> subplan( - new SubplanStage(&_opCtx, collection, &ws, plannerParams, cq.get())); - - PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); - ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); - - size_t numResults = 0; - PlanStage::StageState stageState = PlanStage::NEED_TIME; - while (stageState != PlanStage::IS_EOF) { - WorkingSetID id = WorkingSet::INVALID_ID; - stageState = subplan->work(&id); - ASSERT_NE(stageState, PlanStage::DEAD); - ASSERT_NE(stageState, PlanStage::FAILURE); - if (stageState == PlanStage::ADVANCED) { - ++numResults; - } +TEST_F(QueryStageSubplanTest, QueryStageSubplanPlanRootedOrNE) { + OldClientWriteContext ctx(opCtx(), nss.ns()); + addIndex(BSON("a" << 1 << "b" << 1)); + addIndex(BSON("a" << 1 << "c" << 1)); + + // Every doc matches. + insert(BSON("_id" << 1 << "a" << 1)); + insert(BSON("_id" << 2 << "a" << 2)); + insert(BSON("_id" << 3 << "a" << 3)); + insert(BSON("_id" << 4)); + + auto qr = stdx::make_unique<QueryRequest>(nss); + qr->setFilter(fromjson("{$or: [{a: 1}, {a: {$ne:1}}]}")); + qr->setSort(BSON("d" << 1)); + auto cq = unittest::assertGet(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); + + Collection* collection = ctx.getCollection(); + + QueryPlannerParams plannerParams; + fillOutPlannerParams(opCtx(), collection, cq.get(), &plannerParams); + + WorkingSet ws; + std::unique_ptr<SubplanStage> subplan( + new SubplanStage(opCtx(), collection, &ws, plannerParams, cq.get())); + + PlanYieldPolicy yieldPolicy(PlanExecutor::NO_YIELD, _clock); + ASSERT_OK(subplan->pickBestPlan(&yieldPolicy)); + + size_t numResults = 0; + PlanStage::StageState stageState = PlanStage::NEED_TIME; + while (stageState != PlanStage::IS_EOF) { + WorkingSetID id = WorkingSet::INVALID_ID; + stageState = subplan->work(&id); + ASSERT_NE(stageState, PlanStage::DEAD); + ASSERT_NE(stageState, PlanStage::FAILURE); + if (stageState == PlanStage::ADVANCED) { + ++numResults; } - - ASSERT_EQ(numResults, 4U); } -}; -class All : public Suite { -public: - All() : Suite("query_stage_subplan") {} - - void setupTests() { - add<QueryStageSubplanGeo2dOr>(); - add<QueryStageSubplanPlanFromCache>(); - add<QueryStageSubplanDontCacheZeroResults>(); - add<QueryStageSubplanDontCacheTies>(); - add<QueryStageSubplanCanUseSubplanning>(); - add<QueryStageSubplanRewriteToRootedOr>(); - add<QueryStageSubplanPlanContainedOr>(); - add<QueryStageSubplanPlanRootedOrNE>(); + ASSERT_EQ(numResults, 4U); +} + +TEST_F(QueryStageSubplanTest, ShouldReportErrorIfExceedsTimeLimitDuringPlanning) { + OldClientWriteContext ctx(opCtx(), nss.ns()); + // Build a query with a rooted $or. + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(BSON("$or" << BSON_ARRAY(BSON("p1" << 1) << BSON("p2" << 2)))); + auto canonicalQuery = + uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest))); + + // Add 4 indices: 2 for each predicate to choose from. + addIndex(BSON("p1" << 1 << "opt1" << 1)); + addIndex(BSON("p1" << 1 << "opt2" << 1)); + addIndex(BSON("p2" << 1 << "opt1" << 1)); + addIndex(BSON("p2" << 1 << "opt2" << 1)); + QueryPlannerParams params; + fillOutPlannerParams(opCtx(), ctx.getCollection(), canonicalQuery.get(), ¶ms); + + // Add some data so planning has to do some thinking. + for (int i = 0; i < 100; ++i) { + insert(BSON("_id" << i << "p1" << 1 << "p2" << 1)); + insert(BSON("_id" << 2 * i << "p1" << 1 << "p2" << 2)); + insert(BSON("_id" << 3 * i << "p1" << 2 << "p2" << 1)); + insert(BSON("_id" << 4 * i << "p1" << 2 << "p2" << 2)); } -}; - -SuiteInstance<All> all; -} // namespace QueryStageSubplan + // Create the SubplanStage. + WorkingSet workingSet; + SubplanStage subplanStage( + opCtx(), ctx.getCollection(), &workingSet, params, canonicalQuery.get()); + + AlwaysTimeOutYieldPolicy alwaysTimeOutPolicy(serviceContext()->getFastClockSource()); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, subplanStage.pickBestPlan(&alwaysTimeOutPolicy)); +} + +TEST_F(QueryStageSubplanTest, ShouldReportErrorIfKilledDuringPlanning) { + OldClientWriteContext ctx(opCtx(), nss.ns()); + // Build a query with a rooted $or. + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(BSON("$or" << BSON_ARRAY(BSON("p1" << 1) << BSON("p2" << 2)))); + auto canonicalQuery = + uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest))); + + // Add 4 indices: 2 for each predicate to choose from. + addIndex(BSON("p1" << 1 << "opt1" << 1)); + addIndex(BSON("p1" << 1 << "opt2" << 1)); + addIndex(BSON("p2" << 1 << "opt1" << 1)); + addIndex(BSON("p2" << 1 << "opt2" << 1)); + QueryPlannerParams params; + fillOutPlannerParams(opCtx(), ctx.getCollection(), canonicalQuery.get(), ¶ms); + + // Create the SubplanStage. + WorkingSet workingSet; + SubplanStage subplanStage( + opCtx(), ctx.getCollection(), &workingSet, params, canonicalQuery.get()); + + AlwaysPlanKilledYieldPolicy alwaysPlanKilledYieldPolicy(serviceContext()->getFastClockSource()); + ASSERT_EQ(ErrorCodes::QueryPlanKilled, subplanStage.pickBestPlan(&alwaysPlanKilledYieldPolicy)); +} + +} // namespace +} // namespace mongo |