diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-14 17:15:52 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-17 08:52:57 -0400 |
commit | 3d38a6ff86b47b71d735b77f39704adec3ef3da7 (patch) | |
tree | 8f318b2b52852a1511ed6da6ede9ac62cbe67d4d /src/mongo/db/query/plan_executor.cpp | |
parent | a1c67941bf08c69cab04eba20bc9ce9a763e1c7f (diff) | |
download | mongo-3d38a6ff86b47b71d735b77f39704adec3ef3da7.tar.gz |
SERVER-29128 Fix performance regression on awaitData with lastKnownCommittedOpTime
Revert "Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries""
This reverts commit d29e92cffcb4db3cdd77b1e53d5d005db6cc309d.
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 71 |
1 files changed, 70 insertions, 1 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 2789e660b76..17aa8d42f02 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -32,6 +32,8 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" @@ -43,10 +45,12 @@ #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/query/plan_yield_policy.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/record_fetcher.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/stacktrace.h" namespace mongo { @@ -56,6 +60,11 @@ using std::string; using std::unique_ptr; using std::vector; +const OperationContext::Decoration<bool> shouldWaitForInserts = + OperationContext::declareDecoration<bool>(); +const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime = + OperationContext::declareDecoration<repl::OpTime>(); + namespace { namespace { @@ -380,6 +389,50 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o return getNextImpl(objOut, dlOut); } + +bool PlanExecutor::shouldWaitForInserts() { + // If this is an awaitData-respecting operation and we have time left and we're not interrupted, + // we should wait for inserts. + if (mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && + _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { + // For operations with a last committed opTime, we should not wait if the replication + // coordinator's lastCommittedOpTime has changed. + if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) { + auto replCoord = repl::ReplicationCoordinator::get(_opCtx); + return clientsLastKnownCommittedOpTime(_opCtx) == replCoord->getLastCommittedOpTime(); + } + return true; + } + return false; +} + +bool PlanExecutor::waitForInserts() { + // If we cannot yield, we should retry immediately. + if (!_yieldPolicy->canReleaseLocksDuringExecution()) + return true; + + // We can only wait if we have a collection; otherwise retry immediately. + dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); + auto db = dbHolder().get(_opCtx, _nss.db()); + if (!db) + return true; + auto collection = db->getCollection(_opCtx, _nss); + if (!collection) + return true; + + auto notifier = collection->getCappedInsertNotifier(); + uint64_t notifierVersion = notifier->getVersion(); + auto curOp = CurOp::get(_opCtx); + curOp->pauseTimer(); + ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); + auto opCtx = _opCtx; + bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] { + const auto timeout = opCtx->getRemainingMaxTimeMicros(); + notifier->wait(notifierVersion, timeout); + }); + return yieldResult; +} + PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) { Status status(ErrorCodes::OperationFailed, @@ -508,7 +561,23 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { - return PlanExecutor::IS_EOF; + if (shouldWaitForInserts()) { + const bool locksReacquiredAfterYield = waitForInserts(); + if (locksReacquiredAfterYield) { + // There may be more results, try to get more data. + continue; + } + invariant(isMarkedAsKilled()); + if (objOut) { + Status status(ErrorCodes::OperationFailed, + str::stream() << "Operation aborted because: " << *_killReason); + *objOut = Snapshotted<BSONObj>( + SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); + } + return PlanExecutor::DEAD; + } else { + return PlanExecutor::IS_EOF; + } } else { invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); |