diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2017-12-14 14:25:52 -0500 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2017-12-18 11:19:05 -0500 |
commit | 962c5c61c93776aa4d1a8efb67a1a80cb3bb2ad0 (patch) | |
tree | 9ad8558783798d7c694ec82a4eca648e41b68ca9 /src/mongo/db/query/plan_executor.cpp | |
parent | e972d40e588e9d1b920a75086f0b36c603fbdd3d (diff) | |
download | mongo-962c5c61c93776aa4d1a8efb67a1a80cb3bb2ad0.tar.gz |
SERVER-31684 Fix unexpected "operation exceeded time limit" errors
The changestream queries used an operation context deadline to track
a wait time before returning EOF. This occasionaly interfered with
normal operation deadlines leading to unexpected errors.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 43 |
1 files changed, 18 insertions, 25 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index e1ae5988b3a..b2619986a46 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -45,6 +45,7 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/repl/replication_coordinator.h" @@ -62,8 +63,6 @@ using std::string; using std::unique_ptr; using std::vector; -const OperationContext::Decoration<bool> shouldWaitForInserts = - OperationContext::declareDecoration<bool>(); const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime = OperationContext::declareDecoration<repl::OpTime>(); @@ -424,8 +423,9 @@ bool PlanExecutor::shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() && - mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && - _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { + awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && + awaitDataState(_opCtx).waitForInsertsDeadline > + _opCtx->getServiceContext()->getPreciseClockSource()->now()) { // We expect awaitData cursors to be yielding. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); @@ -470,15 +470,21 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n auto opCtx = _opCtx; uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { - const auto timeout = opCtx->getRemainingMaxTimeMicros(); - notifierData->notifier->wait(notifierData->lastEOFVersion, timeout); + const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; + notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); }); notifierData->lastEOFVersion = currentNotifierVersion; + if (yieldResult.isOK()) { // There may be more results, try to get more data. return ADVANCED; } - return swallowTimeoutIfAwaitData(yieldResult, errorObj); + + if (errorObj) { + *errorObj = Snapshotted<BSONObj>(SnapshotId(), + WorkingSetCommon::buildMemberStatusObject(yieldResult)); + } + return DEAD; } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { @@ -534,7 +540,11 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, if (_yieldPolicy->shouldYield()) { auto yieldStatus = _yieldPolicy->yield(fetcher.get()); if (!yieldStatus.isOK()) { - return swallowTimeoutIfAwaitData(yieldStatus, objOut); + if (objOut) { + *objOut = Snapshotted<BSONObj>( + SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus)); + } + return PlanExecutor::DEAD; } } @@ -687,23 +697,6 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } -PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( - Status yieldError, Snapshotted<BSONObj>* errorObj) const { - if (yieldError == ErrorCodes::ExceededTimeLimit) { - if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) { - // If the cursor is tailable then exceeding the time limit should not destroy this - // PlanExecutor, we should just stop waiting for inserts. - return PlanExecutor::IS_EOF; - } - } - - if (errorObj) { - *errorObj = Snapshotted<BSONObj>(SnapshotId(), - WorkingSetCommon::buildMemberStatusObject(yieldError)); - } - return PlanExecutor::DEAD; -} - Timestamp PlanExecutor::getLatestOplogTimestamp() { if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); |