diff options
author | Spencer Jackson <spencer.jackson@mongodb.com> | 2017-12-14 14:32:18 -0500 |
---|---|---|
committer | Spencer Jackson <spencer.jackson@mongodb.com> | 2017-12-14 14:32:18 -0500 |
commit | 1f38fb202b9f8696cf28d39e674242e036c0b75c (patch) | |
tree | 249a9c923a97dd7e37d8898c3782bd27a6dd53a3 /src/mongo/db/query/plan_executor.cpp | |
parent | 47247293f18ea581954f6fcf4c0018b7828e3c3a (diff) | |
download | mongo-1f38fb202b9f8696cf28d39e674242e036c0b75c.tar.gz |
Revert "SERVER-31684 Fix unexpected "operation exceeded time limit" errors"
This reverts commit b79e5f04ffc79b5892f89c22b9e5f26a297b1185.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 43 |
1 files changed, 25 insertions, 18 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index b2619986a46..e1ae5988b3a 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -45,7 +45,6 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" -#include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/repl/replication_coordinator.h" @@ -63,6 +62,8 @@ using std::string; using std::unique_ptr; using std::vector; +const OperationContext::Decoration<bool> shouldWaitForInserts = + OperationContext::declareDecoration<bool>(); const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime = OperationContext::declareDecoration<repl::OpTime>(); @@ -423,9 +424,8 @@ bool PlanExecutor::shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() && - awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && - awaitDataState(_opCtx).waitForInsertsDeadline > - _opCtx->getServiceContext()->getPreciseClockSource()->now()) { + mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && + _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { // We expect awaitData cursors to be yielding. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); @@ -470,21 +470,15 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n auto opCtx = _opCtx; uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { - const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); + const auto timeout = opCtx->getRemainingMaxTimeMicros(); + notifierData->notifier->wait(notifierData->lastEOFVersion, timeout); }); notifierData->lastEOFVersion = currentNotifierVersion; - if (yieldResult.isOK()) { // There may be more results, try to get more data. return ADVANCED; } - - if (errorObj) { - *errorObj = Snapshotted<BSONObj>(SnapshotId(), - WorkingSetCommon::buildMemberStatusObject(yieldResult)); - } - return DEAD; + return swallowTimeoutIfAwaitData(yieldResult, errorObj); } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { @@ -540,11 +534,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, if (_yieldPolicy->shouldYield()) { auto yieldStatus = _yieldPolicy->yield(fetcher.get()); if (!yieldStatus.isOK()) { - if (objOut) { - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus)); - } - return PlanExecutor::DEAD; + return swallowTimeoutIfAwaitData(yieldStatus, objOut); } } @@ -697,6 +687,23 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } +PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( + Status yieldError, Snapshotted<BSONObj>* errorObj) const { + if (yieldError == ErrorCodes::ExceededTimeLimit) { + if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) { + // If the cursor is tailable then exceeding the time limit should not destroy this + // PlanExecutor, we should just stop waiting for inserts. + return PlanExecutor::IS_EOF; + } + } + + if (errorObj) { + *errorObj = Snapshotted<BSONObj>(SnapshotId(), + WorkingSetCommon::buildMemberStatusObject(yieldError)); + } + return PlanExecutor::DEAD; +} + Timestamp PlanExecutor::getLatestOplogTimestamp() { if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); |