summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-07-14 17:15:52 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-07-17 08:52:57 -0400
commit3d38a6ff86b47b71d735b77f39704adec3ef3da7 (patch)
tree8f318b2b52852a1511ed6da6ede9ac62cbe67d4d /src/mongo/db/query/plan_executor.cpp
parenta1c67941bf08c69cab04eba20bc9ce9a763e1c7f (diff)
downloadmongo-3d38a6ff86b47b71d735b77f39704adec3ef3da7.tar.gz
SERVER-29128 Fix performance regression on awaitData with lastKnownCommittedOpTime
Revert "Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries"" This reverts commit d29e92cffcb4db3cdd77b1e53d5d005db6cc309d.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r--src/mongo/db/query/plan_executor.cpp71
1 files changed, 70 insertions, 1 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 2789e660b76..17aa8d42f02 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -32,6 +32,8 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
@@ -43,10 +45,12 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/query/plan_yield_policy.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
@@ -56,6 +60,11 @@ using std::string;
using std::unique_ptr;
using std::vector;
+const OperationContext::Decoration<bool> shouldWaitForInserts =
+ OperationContext::declareDecoration<bool>();
+const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
+ OperationContext::declareDecoration<repl::OpTime>();
+
namespace {
namespace {
@@ -380,6 +389,50 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
return getNextImpl(objOut, dlOut);
}
+
+bool PlanExecutor::shouldWaitForInserts() {
+ // If this is an awaitData-respecting operation and we have time left and we're not interrupted,
+ // we should wait for inserts.
+ if (mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
+ _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
+ // For operations with a last committed opTime, we should not wait if the replication
+ // coordinator's lastCommittedOpTime has changed.
+ if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) {
+ auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
+ return clientsLastKnownCommittedOpTime(_opCtx) == replCoord->getLastCommittedOpTime();
+ }
+ return true;
+ }
+ return false;
+}
+
+bool PlanExecutor::waitForInserts() {
+ // If we cannot yield, we should retry immediately.
+ if (!_yieldPolicy->canReleaseLocksDuringExecution())
+ return true;
+
+ // We can only wait if we have a collection; otherwise retry immediately.
+ dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
+ auto db = dbHolder().get(_opCtx, _nss.db());
+ if (!db)
+ return true;
+ auto collection = db->getCollection(_opCtx, _nss);
+ if (!collection)
+ return true;
+
+ auto notifier = collection->getCappedInsertNotifier();
+ uint64_t notifierVersion = notifier->getVersion();
+ auto curOp = CurOp::get(_opCtx);
+ curOp->pauseTimer();
+ ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
+ auto opCtx = _opCtx;
+ bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] {
+ const auto timeout = opCtx->getRemainingMaxTimeMicros();
+ notifier->wait(notifierVersion, timeout);
+ });
+ return yieldResult;
+}
+
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) {
Status status(ErrorCodes::OperationFailed,
@@ -508,7 +561,23 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
- return PlanExecutor::IS_EOF;
+ if (shouldWaitForInserts()) {
+ const bool locksReacquiredAfterYield = waitForInserts();
+ if (locksReacquiredAfterYield) {
+ // There may be more results, try to get more data.
+ continue;
+ }
+ invariant(isMarkedAsKilled());
+ if (objOut) {
+ Status status(ErrorCodes::OperationFailed,
+ str::stream() << "Operation aborted because: " << *_killReason);
+ *objOut = Snapshotted<BSONObj>(
+ SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
+ }
+ return PlanExecutor::DEAD;
+ } else {
+ return PlanExecutor::IS_EOF;
+ }
} else {
invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);