summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-07-28 17:17:51 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-08-28 11:24:48 -0400
commit55a85da4980f1967f88bbccbd43646ee89c6301f (patch)
treed0911d9ca87de609e2a3d4d5391ec0752a472f5f /src/mongo/db/query/plan_executor.cpp
parent6e2cc35d6d4370804f09665b243d1e4d5d418ec0 (diff)
downloadmongo-55a85da4980f1967f88bbccbd43646ee89c6301f.tar.gz
SERVER-30410 Ensure executor is saved after tailable cursor time out.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r--src/mongo/db/query/plan_executor.cpp122
1 files changed, 74 insertions, 48 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 4fb1a8458b4..dbeb1d56b71 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
@@ -72,9 +73,30 @@ struct CappedInsertNotifierData {
namespace {
-namespace {
MONGO_FP_DECLARE(planExecutorAlwaysFails);
-} // namespace
+
+/**
+ * Constructs a PlanYieldPolicy based on 'policy'.
+ */
+std::unique_ptr<PlanYieldPolicy> makeYieldPolicy(PlanExecutor* exec,
+ PlanExecutor::YieldPolicy policy) {
+ switch (policy) {
+ case PlanExecutor::YieldPolicy::YIELD_AUTO:
+ case PlanExecutor::YieldPolicy::YIELD_MANUAL:
+ case PlanExecutor::YieldPolicy::NO_YIELD:
+ case PlanExecutor::YieldPolicy::WRITE_CONFLICT_RETRY_ONLY: {
+ return stdx::make_unique<PlanYieldPolicy>(exec, policy);
+ }
+ case PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT: {
+ return stdx::make_unique<AlwaysTimeOutYieldPolicy>(exec);
+ }
+ case PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED: {
+ return stdx::make_unique<AlwaysPlanKilledYieldPolicy>(exec);
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
/**
* Retrieves the first stage of a given type from the plan tree, or NULL
@@ -95,7 +117,7 @@ PlanStage* getStageByType(PlanStage* root, StageType type) {
return NULL;
}
-}
+} // namespace
// static
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make(
@@ -202,7 +224,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx,
_root(std::move(rt)),
_nss(std::move(nss)),
// There's no point in yielding if the collection doesn't exist.
- _yieldPolicy(new PlanYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) {
+ _yieldPolicy(makeYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) {
// We may still need to initialize _nss from either collection or _cq.
if (!_nss.isEmpty()) {
return; // We already have an _nss set, so there's nothing more to do.
@@ -327,7 +349,7 @@ void PlanExecutor::saveState() {
_currentState = kSaved;
}
-bool PlanExecutor::restoreState() {
+Status PlanExecutor::restoreState() {
try {
return restoreStateWithoutRetrying();
} catch (const WriteConflictException&) {
@@ -339,7 +361,7 @@ bool PlanExecutor::restoreState() {
}
}
-bool PlanExecutor::restoreStateWithoutRetrying() {
+Status PlanExecutor::restoreStateWithoutRetrying() {
invariant(_currentState == kSaved);
if (!isMarkedAsKilled()) {
@@ -347,7 +369,9 @@ bool PlanExecutor::restoreStateWithoutRetrying() {
}
_currentState = kUsable;
- return !isMarkedAsKilled();
+ return isMarkedAsKilled()
+ ? Status{ErrorCodes::QueryPlanKilled, "query killed during yield: " + *_killReason}
+ : Status::OK();
}
void PlanExecutor::detachFromOperationContext() {
@@ -401,6 +425,9 @@ bool PlanExecutor::shouldWaitForInserts() {
if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData() &&
mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
_opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
+ // We expect awaitData cursors to be yielding.
+ invariant(_yieldPolicy->canReleaseLocksDuringExecution());
+
// For operations with a last committed opTime, we should not wait if the replication
// coordinator's lastCommittedOpTime has changed.
if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) {
@@ -413,28 +440,23 @@ bool PlanExecutor::shouldWaitForInserts() {
}
std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() {
- // If we cannot yield, we should retry immediately when we hit EOF, so do not get
- // a CappedInsertNotifier.
- if (!_yieldPolicy->canReleaseLocksDuringExecution())
- return nullptr;
+ // We don't expect to need a capped insert notifier for non-yielding plans.
+ invariant(_yieldPolicy->canReleaseLocksDuringExecution());
// We can only wait if we have a collection; otherwise we should retry immediately when
// we hit EOF.
dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
auto db = dbHolder().get(_opCtx, _nss.db());
- if (!db)
- return nullptr;
+ invariant(db);
auto collection = db->getCollection(_opCtx, _nss);
- if (!collection)
- return nullptr;
+ invariant(collection);
return collection->getCappedInsertNotifier();
}
-bool PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData) {
- // We tested to see if we could wait when getting the CappedInsertNotifier.
- if (!notifierData->notifier)
- return true;
+PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData,
+ Snapshotted<BSONObj>* errorObj) {
+ invariant(notifierData->notifier);
// The notifier wait() method will not wait unless the version passed to it matches the
// current version of the notifier. Since the version passed to it is the current version
@@ -446,12 +468,16 @@ bool PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData) {
ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
auto opCtx = _opCtx;
uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
- bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] {
+ auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] {
const auto timeout = opCtx->getRemainingMaxTimeMicros();
notifierData->notifier->wait(notifierData->lastEOFVersion, timeout);
});
notifierData->lastEOFVersion = currentNotifierVersion;
- return yieldResult;
+ if (yieldResult.isOK()) {
+ // There may be more results, try to get more data.
+ return ADVANCED;
+ }
+ return swallowTimeoutIfAwaitData(yieldResult, errorObj);
}
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
@@ -505,18 +531,9 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
// 3) we need to yield and retry due to a WriteConflictException.
// In all cases, the actual yielding happens here.
if (_yieldPolicy->shouldYield()) {
- if (!_yieldPolicy->yield(fetcher.get())) {
- // A return of false from a yield should only happen if we've been killed during the
- // yield.
- invariant(isMarkedAsKilled());
-
- if (NULL != objOut) {
- Status status(ErrorCodes::OperationFailed,
- str::stream() << "Operation aborted because: " << *_killReason);
- *objOut = Snapshotted<BSONObj>(
- SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
- }
- return PlanExecutor::DEAD;
+ auto yieldStatus = _yieldPolicy->yield(fetcher.get());
+ if (!yieldStatus.isOK()) {
+ return swallowTimeoutIfAwaitData(yieldStatus, objOut);
}
}
@@ -589,23 +606,15 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
- if (shouldWaitForInserts()) {
- const bool locksReacquiredAfterYield = waitForInserts(&cappedInsertNotifierData);
- if (locksReacquiredAfterYield) {
- // There may be more results, try to get more data.
- continue;
- }
- invariant(isMarkedAsKilled());
- if (objOut) {
- Status status(ErrorCodes::OperationFailed,
- str::stream() << "Operation aborted because: " << *_killReason);
- *objOut = Snapshotted<BSONObj>(
- SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
- }
- return PlanExecutor::DEAD;
- } else {
+ if (!shouldWaitForInserts()) {
return PlanExecutor::IS_EOF;
}
+ const ExecState waitResult = waitForInserts(&cappedInsertNotifierData, objOut);
+ if (waitResult == PlanExecutor::ADVANCED) {
+ // There may be more results, keep going.
+ continue;
+ }
+ return waitResult;
} else {
invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);
@@ -677,6 +686,23 @@ void PlanExecutor::enqueue(const BSONObj& obj) {
_stash.push(obj.getOwned());
}
+PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData(
+ Status yieldError, Snapshotted<BSONObj>* errorObj) const {
+ if (yieldError == ErrorCodes::ExceededTimeLimit) {
+ if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData()) {
+ // If the cursor is tailable then exceeding the time limit should not
+ // destroy this PlanExecutor, we should just stop waiting for inserts.
+ return PlanExecutor::IS_EOF;
+ }
+ }
+
+ if (errorObj) {
+ *errorObj = Snapshotted<BSONObj>(SnapshotId(),
+ WorkingSetCommon::buildMemberStatusObject(yieldError));
+ }
+ return PlanExecutor::DEAD;
+}
+
//
// PlanExecutor::Deleter
//