diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-07-28 17:17:51 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-08-28 11:24:48 -0400 |
commit | 55a85da4980f1967f88bbccbd43646ee89c6301f (patch) | |
tree | d0911d9ca87de609e2a3d4d5391ec0752a472f5f /src/mongo/db/query/plan_executor.cpp | |
parent | 6e2cc35d6d4370804f09665b243d1e4d5d418ec0 (diff) | |
download | mongo-55a85da4980f1967f88bbccbd43646ee89c6301f.tar.gz |
SERVER-30410 Ensure executor is saved after tailable cursor time out.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 122 |
1 files changed, 74 insertions, 48 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 4fb1a8458b4..dbeb1d56b71 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -44,6 +44,7 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" @@ -72,9 +73,30 @@ struct CappedInsertNotifierData { namespace { -namespace { MONGO_FP_DECLARE(planExecutorAlwaysFails); -} // namespace + +/** + * Constructs a PlanYieldPolicy based on 'policy'. + */ +std::unique_ptr<PlanYieldPolicy> makeYieldPolicy(PlanExecutor* exec, + PlanExecutor::YieldPolicy policy) { + switch (policy) { + case PlanExecutor::YieldPolicy::YIELD_AUTO: + case PlanExecutor::YieldPolicy::YIELD_MANUAL: + case PlanExecutor::YieldPolicy::NO_YIELD: + case PlanExecutor::YieldPolicy::WRITE_CONFLICT_RETRY_ONLY: { + return stdx::make_unique<PlanYieldPolicy>(exec, policy); + } + case PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT: { + return stdx::make_unique<AlwaysTimeOutYieldPolicy>(exec); + } + case PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED: { + return stdx::make_unique<AlwaysPlanKilledYieldPolicy>(exec); + } + default: + MONGO_UNREACHABLE; + } +} /** * Retrieves the first stage of a given type from the plan tree, or NULL @@ -95,7 +117,7 @@ PlanStage* getStageByType(PlanStage* root, StageType type) { return NULL; } -} +} // namespace // static StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( @@ -202,7 +224,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, _root(std::move(rt)), _nss(std::move(nss)), // There's no point in yielding if the collection doesn't exist. - _yieldPolicy(new PlanYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) { + _yieldPolicy(makeYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) { // We may still need to initialize _nss from either collection or _cq. if (!_nss.isEmpty()) { return; // We already have an _nss set, so there's nothing more to do. @@ -327,7 +349,7 @@ void PlanExecutor::saveState() { _currentState = kSaved; } -bool PlanExecutor::restoreState() { +Status PlanExecutor::restoreState() { try { return restoreStateWithoutRetrying(); } catch (const WriteConflictException&) { @@ -339,7 +361,7 @@ bool PlanExecutor::restoreState() { } } -bool PlanExecutor::restoreStateWithoutRetrying() { +Status PlanExecutor::restoreStateWithoutRetrying() { invariant(_currentState == kSaved); if (!isMarkedAsKilled()) { @@ -347,7 +369,9 @@ bool PlanExecutor::restoreStateWithoutRetrying() { } _currentState = kUsable; - return !isMarkedAsKilled(); + return isMarkedAsKilled() + ? Status{ErrorCodes::QueryPlanKilled, "query killed during yield: " + *_killReason} + : Status::OK(); } void PlanExecutor::detachFromOperationContext() { @@ -401,6 +425,9 @@ bool PlanExecutor::shouldWaitForInserts() { if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData() && mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { + // We expect awaitData cursors to be yielding. + invariant(_yieldPolicy->canReleaseLocksDuringExecution()); + // For operations with a last committed opTime, we should not wait if the replication // coordinator's lastCommittedOpTime has changed. if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) { @@ -413,28 +440,23 @@ bool PlanExecutor::shouldWaitForInserts() { } std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() { - // If we cannot yield, we should retry immediately when we hit EOF, so do not get - // a CappedInsertNotifier. - if (!_yieldPolicy->canReleaseLocksDuringExecution()) - return nullptr; + // We don't expect to need a capped insert notifier for non-yielding plans. + invariant(_yieldPolicy->canReleaseLocksDuringExecution()); // We can only wait if we have a collection; otherwise we should retry immediately when // we hit EOF. dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); auto db = dbHolder().get(_opCtx, _nss.db()); - if (!db) - return nullptr; + invariant(db); auto collection = db->getCollection(_opCtx, _nss); - if (!collection) - return nullptr; + invariant(collection); return collection->getCappedInsertNotifier(); } -bool PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData) { - // We tested to see if we could wait when getting the CappedInsertNotifier. - if (!notifierData->notifier) - return true; +PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData, + Snapshotted<BSONObj>* errorObj) { + invariant(notifierData->notifier); // The notifier wait() method will not wait unless the version passed to it matches the // current version of the notifier. Since the version passed to it is the current version @@ -446,12 +468,16 @@ bool PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData) { ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); auto opCtx = _opCtx; uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); - bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { + auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { const auto timeout = opCtx->getRemainingMaxTimeMicros(); notifierData->notifier->wait(notifierData->lastEOFVersion, timeout); }); notifierData->lastEOFVersion = currentNotifierVersion; - return yieldResult; + if (yieldResult.isOK()) { + // There may be more results, try to get more data. + return ADVANCED; + } + return swallowTimeoutIfAwaitData(yieldResult, errorObj); } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { @@ -505,18 +531,9 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, // 3) we need to yield and retry due to a WriteConflictException. // In all cases, the actual yielding happens here. if (_yieldPolicy->shouldYield()) { - if (!_yieldPolicy->yield(fetcher.get())) { - // A return of false from a yield should only happen if we've been killed during the - // yield. - invariant(isMarkedAsKilled()); - - if (NULL != objOut) { - Status status(ErrorCodes::OperationFailed, - str::stream() << "Operation aborted because: " << *_killReason); - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); - } - return PlanExecutor::DEAD; + auto yieldStatus = _yieldPolicy->yield(fetcher.get()); + if (!yieldStatus.isOK()) { + return swallowTimeoutIfAwaitData(yieldStatus, objOut); } } @@ -589,23 +606,15 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { - if (shouldWaitForInserts()) { - const bool locksReacquiredAfterYield = waitForInserts(&cappedInsertNotifierData); - if (locksReacquiredAfterYield) { - // There may be more results, try to get more data. - continue; - } - invariant(isMarkedAsKilled()); - if (objOut) { - Status status(ErrorCodes::OperationFailed, - str::stream() << "Operation aborted because: " << *_killReason); - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); - } - return PlanExecutor::DEAD; - } else { + if (!shouldWaitForInserts()) { return PlanExecutor::IS_EOF; } + const ExecState waitResult = waitForInserts(&cappedInsertNotifierData, objOut); + if (waitResult == PlanExecutor::ADVANCED) { + // There may be more results, keep going. + continue; + } + return waitResult; } else { invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); @@ -677,6 +686,23 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } +PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( + Status yieldError, Snapshotted<BSONObj>* errorObj) const { + if (yieldError == ErrorCodes::ExceededTimeLimit) { + if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData()) { + // If the cursor is tailable then exceeding the time limit should not + // destroy this PlanExecutor, we should just stop waiting for inserts. + return PlanExecutor::IS_EOF; + } + } + + if (errorObj) { + *errorObj = Snapshotted<BSONObj>(SnapshotId(), + WorkingSetCommon::buildMemberStatusObject(yieldError)); + } + return PlanExecutor::DEAD; +} + // // PlanExecutor::Deleter // |