summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-07-10 13:47:13 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-07-11 13:51:24 -0400
commit3bab15739e421e9eed4bf180cbcf5c7392a9a90d (patch)
treef346909f73f9cac8d1eaf3811944e521945cf8d8 /src/mongo/db/query/plan_executor.cpp
parentd712243cb381d5ae98d4bc132ace16aac91d0fe9 (diff)
downloadmongo-3bab15739e421e9eed4bf180cbcf5c7392a9a90d.tar.gz
SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r--src/mongo/db/query/plan_executor.cpp59
1 files changed, 58 insertions, 1 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 2789e660b76..ff18a403286 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -32,6 +32,8 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
@@ -47,6 +49,7 @@
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
@@ -56,6 +59,9 @@ using std::string;
using std::unique_ptr;
using std::vector;
+const OperationContext::Decoration<bool> shouldWaitForInserts =
+ OperationContext::declareDecoration<bool>();
+
namespace {
namespace {
@@ -380,6 +386,41 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
return getNextImpl(objOut, dlOut);
}
+
+bool PlanExecutor::shouldWaitForInserts() {
+ // If this is an awaitData-respecting operation and we have time left and we're not interrupted,
+ // we should wait for inserts.
+ return mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
+ _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero();
+}
+
+bool PlanExecutor::waitForInserts() {
+ // If we cannot yield, we should retry immediately.
+ if (!_yieldPolicy->canReleaseLocksDuringExecution())
+ return true;
+
+ // We can only wait if we have a collection; otherwise retry immediately.
+ dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
+ auto db = dbHolder().get(_opCtx, _nss.db());
+ if (!db)
+ return true;
+ auto collection = db->getCollection(_opCtx, _nss);
+ if (!collection)
+ return true;
+
+ auto notifier = collection->getCappedInsertNotifier();
+ uint64_t notifierVersion = notifier->getVersion();
+ auto curOp = CurOp::get(_opCtx);
+ curOp->pauseTimer();
+ ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
+ auto opCtx = _opCtx;
+ bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] {
+ const auto timeout = opCtx->getRemainingMaxTimeMicros();
+ notifier->wait(notifierVersion, timeout);
+ });
+ return yieldResult;
+}
+
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) {
Status status(ErrorCodes::OperationFailed,
@@ -508,7 +549,23 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
- return PlanExecutor::IS_EOF;
+ if (shouldWaitForInserts()) {
+ const bool locksReacquiredAfterYield = waitForInserts();
+ if (locksReacquiredAfterYield) {
+ // There may be more results, try to get more data.
+ continue;
+ }
+ invariant(isMarkedAsKilled());
+ if (objOut) {
+ Status status(ErrorCodes::OperationFailed,
+ str::stream() << "Operation aborted because: " << *_killReason);
+ *objOut = Snapshotted<BSONObj>(
+ SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
+ }
+ return PlanExecutor::DEAD;
+ } else {
+ return PlanExecutor::IS_EOF;
+ }
} else {
invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);