summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor.cpp
diff options
context:
space:
mode:
authorSpencer Jackson <spencer.jackson@mongodb.com>2017-12-14 14:32:18 -0500
committerSpencer Jackson <spencer.jackson@mongodb.com>2017-12-14 14:32:18 -0500
commit1f38fb202b9f8696cf28d39e674242e036c0b75c (patch)
tree249a9c923a97dd7e37d8898c3782bd27a6dd53a3 /src/mongo/db/query/plan_executor.cpp
parent47247293f18ea581954f6fcf4c0018b7828e3c3a (diff)
downloadmongo-1f38fb202b9f8696cf28d39e674242e036c0b75c.tar.gz
Revert "SERVER-31684 Fix unexpected "operation exceeded time limit" errors"
This reverts commit b79e5f04ffc79b5892f89c22b9e5f26a297b1185.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r--src/mongo/db/query/plan_executor.cpp43
1 files changed, 25 insertions, 18 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index b2619986a46..e1ae5988b3a 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
-#include "mongo/db/query/find_common.h"
#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -63,6 +62,8 @@ using std::string;
using std::unique_ptr;
using std::vector;
+const OperationContext::Decoration<bool> shouldWaitForInserts =
+ OperationContext::declareDecoration<bool>();
const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
OperationContext::declareDecoration<repl::OpTime>();
@@ -423,9 +424,8 @@ bool PlanExecutor::shouldWaitForInserts() {
// If this is an awaitData-respecting operation and we have time left and we're not interrupted,
// we should wait for inserts.
if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
- awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
- awaitDataState(_opCtx).waitForInsertsDeadline >
- _opCtx->getServiceContext()->getPreciseClockSource()->now()) {
+ mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
+ _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
// We expect awaitData cursors to be yielding.
invariant(_yieldPolicy->canReleaseLocksDuringExecution());
@@ -470,21 +470,15 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n
auto opCtx = _opCtx;
uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] {
- const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
- notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
+ const auto timeout = opCtx->getRemainingMaxTimeMicros();
+ notifierData->notifier->wait(notifierData->lastEOFVersion, timeout);
});
notifierData->lastEOFVersion = currentNotifierVersion;
-
if (yieldResult.isOK()) {
// There may be more results, try to get more data.
return ADVANCED;
}
-
- if (errorObj) {
- *errorObj = Snapshotted<BSONObj>(SnapshotId(),
- WorkingSetCommon::buildMemberStatusObject(yieldResult));
- }
- return DEAD;
+ return swallowTimeoutIfAwaitData(yieldResult, errorObj);
}
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
@@ -540,11 +534,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
if (_yieldPolicy->shouldYield()) {
auto yieldStatus = _yieldPolicy->yield(fetcher.get());
if (!yieldStatus.isOK()) {
- if (objOut) {
- *objOut = Snapshotted<BSONObj>(
- SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus));
- }
- return PlanExecutor::DEAD;
+ return swallowTimeoutIfAwaitData(yieldStatus, objOut);
}
}
@@ -697,6 +687,23 @@ void PlanExecutor::enqueue(const BSONObj& obj) {
_stash.push(obj.getOwned());
}
+PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData(
+ Status yieldError, Snapshotted<BSONObj>* errorObj) const {
+ if (yieldError == ErrorCodes::ExceededTimeLimit) {
+ if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) {
+ // If the cursor is tailable then exceeding the time limit should not destroy this
+ // PlanExecutor, we should just stop waiting for inserts.
+ return PlanExecutor::IS_EOF;
+ }
+ }
+
+ if (errorObj) {
+ *errorObj = Snapshotted<BSONObj>(SnapshotId(),
+ WorkingSetCommon::buildMemberStatusObject(yieldError));
+ }
+ return PlanExecutor::DEAD;
+}
+
Timestamp PlanExecutor::getLatestOplogTimestamp() {
if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();