diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-10 13:47:13 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-11 13:51:24 -0400 |
commit | 3bab15739e421e9eed4bf180cbcf5c7392a9a90d (patch) | |
tree | f346909f73f9cac8d1eaf3811944e521945cf8d8 /src/mongo/db/query/plan_executor.cpp | |
parent | d712243cb381d5ae98d4bc132ace16aac91d0fe9 (diff) | |
download | mongo-3bab15739e421e9eed4bf180cbcf5c7392a9a90d.tar.gz |
SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 59 |
1 files changed, 58 insertions, 1 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 2789e660b76..ff18a403286 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -32,6 +32,8 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" @@ -47,6 +49,7 @@ #include "mongo/db/storage/record_fetcher.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/stacktrace.h" namespace mongo { @@ -56,6 +59,9 @@ using std::string; using std::unique_ptr; using std::vector; +const OperationContext::Decoration<bool> shouldWaitForInserts = + OperationContext::declareDecoration<bool>(); + namespace { namespace { @@ -380,6 +386,41 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o return getNextImpl(objOut, dlOut); } + +bool PlanExecutor::shouldWaitForInserts() { + // If this is an awaitData-respecting operation and we have time left and we're not interrupted, + // we should wait for inserts. + return mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && + _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero(); +} + +bool PlanExecutor::waitForInserts() { + // If we cannot yield, we should retry immediately. + if (!_yieldPolicy->canReleaseLocksDuringExecution()) + return true; + + // We can only wait if we have a collection; otherwise retry immediately. + dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); + auto db = dbHolder().get(_opCtx, _nss.db()); + if (!db) + return true; + auto collection = db->getCollection(_opCtx, _nss); + if (!collection) + return true; + + auto notifier = collection->getCappedInsertNotifier(); + uint64_t notifierVersion = notifier->getVersion(); + auto curOp = CurOp::get(_opCtx); + curOp->pauseTimer(); + ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); + auto opCtx = _opCtx; + bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] { + const auto timeout = opCtx->getRemainingMaxTimeMicros(); + notifier->wait(notifierVersion, timeout); + }); + return yieldResult; +} + PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) { Status status(ErrorCodes::OperationFailed, @@ -508,7 +549,23 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { - return PlanExecutor::IS_EOF; + if (shouldWaitForInserts()) { + const bool locksReacquiredAfterYield = waitForInserts(); + if (locksReacquiredAfterYield) { + // There may be more results, try to get more data. + continue; + } + invariant(isMarkedAsKilled()); + if (objOut) { + Status status(ErrorCodes::OperationFailed, + str::stream() << "Operation aborted because: " << *_killReason); + *objOut = Snapshotted<BSONObj>( + SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); + } + return PlanExecutor::DEAD; + } else { + return PlanExecutor::IS_EOF; + } } else { invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); |