summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor.cpp
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2017-12-14 14:25:52 -0500
committerMartin Neupauer <martin.neupauer@mongodb.com>2017-12-18 11:19:05 -0500
commit962c5c61c93776aa4d1a8efb67a1a80cb3bb2ad0 (patch)
tree9ad8558783798d7c694ec82a4eca648e41b68ca9 /src/mongo/db/query/plan_executor.cpp
parente972d40e588e9d1b920a75086f0b36c603fbdd3d (diff)
downloadmongo-962c5c61c93776aa4d1a8efb67a1a80cb3bb2ad0.tar.gz
SERVER-31684 Fix unexpected "operation exceeded time limit" errors
The changestream queries used an operation context deadline to track a wait time before returning EOF. This occasionaly interfered with normal operation deadlines leading to unexpected errors.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r--src/mongo/db/query/plan_executor.cpp43
1 files changed, 18 insertions, 25 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index e1ae5988b3a..b2619986a46 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -62,8 +63,6 @@ using std::string;
using std::unique_ptr;
using std::vector;
-const OperationContext::Decoration<bool> shouldWaitForInserts =
- OperationContext::declareDecoration<bool>();
const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
OperationContext::declareDecoration<repl::OpTime>();
@@ -424,8 +423,9 @@ bool PlanExecutor::shouldWaitForInserts() {
// If this is an awaitData-respecting operation and we have time left and we're not interrupted,
// we should wait for inserts.
if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
- mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
- _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
+ awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
+ awaitDataState(_opCtx).waitForInsertsDeadline >
+ _opCtx->getServiceContext()->getPreciseClockSource()->now()) {
// We expect awaitData cursors to be yielding.
invariant(_yieldPolicy->canReleaseLocksDuringExecution());
@@ -470,15 +470,21 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n
auto opCtx = _opCtx;
uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] {
- const auto timeout = opCtx->getRemainingMaxTimeMicros();
- notifierData->notifier->wait(notifierData->lastEOFVersion, timeout);
+ const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
+ notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
});
notifierData->lastEOFVersion = currentNotifierVersion;
+
if (yieldResult.isOK()) {
// There may be more results, try to get more data.
return ADVANCED;
}
- return swallowTimeoutIfAwaitData(yieldResult, errorObj);
+
+ if (errorObj) {
+ *errorObj = Snapshotted<BSONObj>(SnapshotId(),
+ WorkingSetCommon::buildMemberStatusObject(yieldResult));
+ }
+ return DEAD;
}
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
@@ -534,7 +540,11 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
if (_yieldPolicy->shouldYield()) {
auto yieldStatus = _yieldPolicy->yield(fetcher.get());
if (!yieldStatus.isOK()) {
- return swallowTimeoutIfAwaitData(yieldStatus, objOut);
+ if (objOut) {
+ *objOut = Snapshotted<BSONObj>(
+ SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus));
+ }
+ return PlanExecutor::DEAD;
}
}
@@ -687,23 +697,6 @@ void PlanExecutor::enqueue(const BSONObj& obj) {
_stash.push(obj.getOwned());
}
-PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData(
- Status yieldError, Snapshotted<BSONObj>* errorObj) const {
- if (yieldError == ErrorCodes::ExceededTimeLimit) {
- if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) {
- // If the cursor is tailable then exceeding the time limit should not destroy this
- // PlanExecutor, we should just stop waiting for inserts.
- return PlanExecutor::IS_EOF;
- }
- }
-
- if (errorObj) {
- *errorObj = Snapshotted<BSONObj>(SnapshotId(),
- WorkingSetCommon::buildMemberStatusObject(yieldError));
- }
- return PlanExecutor::DEAD;
-}
-
Timestamp PlanExecutor::getLatestOplogTimestamp() {
if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();