From b79e5f04ffc79b5892f89c22b9e5f26a297b1185 Mon Sep 17 00:00:00 2001 From: Martin Neupauer Date: Wed, 15 Nov 2017 15:38:14 -0500 Subject: SERVER-31684 Fix unexpected "operation exceeded time limit" errors The changestream queries used an operation context deadline to track a wait time before returning EOF. This occasionaly interfered with normal operation deadlines leading to unexpected errors. --- src/mongo/db/catalog/collection.cpp | 24 ++----------- src/mongo/db/catalog/collection.h | 19 ++--------- src/mongo/db/commands/getmore_cmd.cpp | 21 +++++------- src/mongo/db/operation_context.h | 8 +++++ src/mongo/db/pipeline/document_source_cursor.cpp | 3 +- src/mongo/db/query/find.cpp | 4 ++- src/mongo/db/query/find_common.cpp | 3 ++ src/mongo/db/query/find_common.h | 20 +++++++++++ src/mongo/db/query/plan_executor.cpp | 43 ++++++++++-------------- src/mongo/db/query/plan_executor.h | 15 --------- 10 files changed, 68 insertions(+), 92 deletions(-) (limited to 'src/mongo') diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 5d8e64a9bdd..6493d147605 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -152,33 +152,15 @@ void CappedInsertNotifier::notifyAll() { _notifier.notify_all(); } -void CappedInsertNotifier::_wait(stdx::unique_lock& lk, - uint64_t prevVersion, - Microseconds timeout) const { +void CappedInsertNotifier::waitUntil(uint64_t prevVersion, Date_t deadline) const { + stdx::unique_lock lk(_mutex); while (!_dead && prevVersion == _version) { - if (timeout == Microseconds::max()) { - _notifier.wait(lk); - } else if (stdx::cv_status::timeout == _notifier.wait_for(lk, timeout.toSystemDuration())) { + if (stdx::cv_status::timeout == _notifier.wait_until(lk, deadline.toSystemTimePoint())) { return; } } } -void CappedInsertNotifier::wait(uint64_t prevVersion, Microseconds timeout) const { - stdx::unique_lock lk(_mutex); - _wait(lk, prevVersion, timeout); -} - -void CappedInsertNotifier::wait(Microseconds timeout) const { - stdx::unique_lock lk(_mutex); - _wait(lk, _version, timeout); -} - -void CappedInsertNotifier::wait() const { - stdx::unique_lock lk(_mutex); - _wait(lk, _version, Microseconds::max()); -} - void CappedInsertNotifier::kill() { stdx::lock_guard lk(_mutex); _dead = true; diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 216bfc971af..1e9ad8ff4cd 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -112,17 +112,12 @@ public: void notifyAll(); /** - * Waits for 'timeout' microseconds, or until notifyAll() is called to indicate that new + * Waits until 'deadline', or until notifyAll() is called to indicate that new * data is available in the capped collection. * * NOTE: Waiting threads can be signaled by calling kill or notify* methods. */ - void wait(Microseconds timeout) const; - - /** - * Same as above but also ensures that if the version has changed, it also returns. - */ - void wait(uint64_t prevVersion, Microseconds timeout) const; + void waitUntil(uint64_t prevVersion, Date_t deadline) const; /** * Returns the version for use as an additional wake condition when used above. @@ -131,11 +126,6 @@ public: return _version; } - /** - * Same as above but without a timeout. - */ - void wait() const; - /** * Cancels the notifier if the collection is dropped/invalidated, and wakes all waiting. */ @@ -147,11 +137,6 @@ public: bool isDead(); private: - // Helper for wait impls. - void _wait(stdx::unique_lock& lk, - uint64_t prevVersion, - Microseconds timeout) const; - // Signalled when a successful insert is made into a capped collection. mutable stdx::condition_variable _notifier; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index cc9416cf621..4bab002396f 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -282,19 +282,21 @@ public: if (cursor->isReadCommitted()) uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); - const bool hasOwnMaxTime = opCtx->hasDeadline(); - const bool disableAwaitDataFailpointActive = MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); + // We assume that cursors created through a DBDirectClient are always used from their // original OperationContext, so we do not need to move time to and from the cursor. - if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) { + if (!opCtx->getClient()->isInDirectClient()) { // There is no time limit set directly on this getMore command. If the cursor is // awaitData, then we supply a default time of one second. Otherwise we roll over // any leftover time from the maxTimeMS of the operation that spawned this cursor, // applying it to this getMore. if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { - opCtx->setDeadlineAfterNowBy(Seconds{1}); + opCtx->clearDeadline(); + awaitDataState(opCtx).waitForInsertsDeadline = + opCtx->getServiceContext()->getPreciseClockSource()->now() + + request.awaitDataTimeout.value_or(Seconds{1}); } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); } @@ -336,7 +338,7 @@ public: if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { if (request.lastKnownCommittedOpTime) clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); - shouldWaitForInserts(opCtx) = true; + awaitDataState(opCtx).shouldWaitForInserts = true; } Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); @@ -366,12 +368,7 @@ public: exec->saveState(); exec->detachFromOperationContext(); - // If maxTimeMS was set directly on the getMore rather than being rolled over - // from a previous find, then don't roll remaining micros over to the next - // getMore. - if (!hasOwnMaxTime) { - cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); - } + cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); cursor->incPos(numResults); } else { @@ -439,7 +436,7 @@ public: } // As soon as we get a result, this operation no longer waits. - shouldWaitForInserts(opCtx) = false; + awaitDataState(opCtx).shouldWaitForInserts = false; // Add result to output buffer. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); nextBatch->append(obj); diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 4d5f05ceb85..915b8060981 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -410,6 +410,14 @@ public: return _deadline; } + /** + * Reset the deadline for this operation. + */ + void clearDeadline() { + _deadline = Date_t::max(); + _maxTime = computeMaxTimeFromDeadline(_deadline); + } + /** * Returns the number of milliseconds remaining for this operation's time limit or * Milliseconds::max() if the operation has no time limit. diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index bba8a3dee72..a25926f97e0 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -34,6 +34,7 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/query/explain.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/storage/storage_options.h" #include "mongo/util/scopeguard.h" @@ -104,7 +105,7 @@ void DocumentSourceCursor::loadBatch() { // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and // needs-merge case), batching will result in a wrong time. - if (shouldWaitForInserts(pExpCtx->opCtx) || + if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 50db4cff04b..394f7fa552a 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -428,7 +428,9 @@ Message getMore(OperationContext* opCtx, // the total operation latency. curOp.pauseTimer(); Seconds timeout(1); - notifier->wait(notifierVersion, timeout); + notifier->waitUntil(notifierVersion, + opCtx->getServiceContext()->getPreciseClockSource()->now() + + timeout); notifier.reset(); curOp.resumeTimer(); diff --git a/src/mongo/db/query/find_common.cpp b/src/mongo/db/query/find_common.cpp index fa2049b07a0..b59b6e2a699 100644 --- a/src/mongo/db/query/find_common.cpp +++ b/src/mongo/db/query/find_common.cpp @@ -40,6 +40,9 @@ MONGO_FP_DECLARE(keepCursorPinnedDuringGetMore); MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd); +const OperationContext::Decoration awaitDataState = + OperationContext::declareDecoration(); + bool FindCommon::enoughForFirstBatch(const QueryRequest& qr, long long numDocs) { if (!qr.getEffectiveBatchSize()) { // We enforce a default batch size for the initial find if no batch size is specified. diff --git a/src/mongo/db/query/find_common.h b/src/mongo/db/query/find_common.h index 657554a0d94..0e83e3cb546 100644 --- a/src/mongo/db/query/find_common.h +++ b/src/mongo/db/query/find_common.h @@ -27,10 +27,30 @@ */ #include "mongo/bson/bsonobj.h" +#include "mongo/db/operation_context.h" #include "mongo/util/fail_point_service.h" namespace mongo { +/** + * The state associated with tailable cursors. + */ +struct AwaitDataState { + /** + * The deadline for how long we wait on the tail of capped collection before returning IS_EOF. + */ + Date_t waitForInsertsDeadline; + + /** + * If true, when no results are available from a plan, then instead of returning immediately, + * the system should wait up to the length of the operation deadline for data to be inserted + * which causes results to become available. + */ + bool shouldWaitForInserts; +}; + +extern const OperationContext::Decoration awaitDataState; + class BSONObj; class QueryRequest; diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index e1ae5988b3a..b2619986a46 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -45,6 +45,7 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/repl/replication_coordinator.h" @@ -62,8 +63,6 @@ using std::string; using std::unique_ptr; using std::vector; -const OperationContext::Decoration shouldWaitForInserts = - OperationContext::declareDecoration(); const OperationContext::Decoration clientsLastKnownCommittedOpTime = OperationContext::declareDecoration(); @@ -424,8 +423,9 @@ bool PlanExecutor::shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() && - mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && - _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { + awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && + awaitDataState(_opCtx).waitForInsertsDeadline > + _opCtx->getServiceContext()->getPreciseClockSource()->now()) { // We expect awaitData cursors to be yielding. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); @@ -470,15 +470,21 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n auto opCtx = _opCtx; uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { - const auto timeout = opCtx->getRemainingMaxTimeMicros(); - notifierData->notifier->wait(notifierData->lastEOFVersion, timeout); + const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; + notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); }); notifierData->lastEOFVersion = currentNotifierVersion; + if (yieldResult.isOK()) { // There may be more results, try to get more data. return ADVANCED; } - return swallowTimeoutIfAwaitData(yieldResult, errorObj); + + if (errorObj) { + *errorObj = Snapshotted(SnapshotId(), + WorkingSetCommon::buildMemberStatusObject(yieldResult)); + } + return DEAD; } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted* objOut, RecordId* dlOut) { @@ -534,7 +540,11 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted* objOut, if (_yieldPolicy->shouldYield()) { auto yieldStatus = _yieldPolicy->yield(fetcher.get()); if (!yieldStatus.isOK()) { - return swallowTimeoutIfAwaitData(yieldStatus, objOut); + if (objOut) { + *objOut = Snapshotted( + SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus)); + } + return PlanExecutor::DEAD; } } @@ -687,23 +697,6 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } -PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( - Status yieldError, Snapshotted* errorObj) const { - if (yieldError == ErrorCodes::ExceededTimeLimit) { - if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) { - // If the cursor is tailable then exceeding the time limit should not destroy this - // PlanExecutor, we should just stop waiting for inserts. - return PlanExecutor::IS_EOF; - } - } - - if (errorObj) { - *errorObj = Snapshotted(SnapshotId(), - WorkingSetCommon::buildMemberStatusObject(yieldError)); - } - return PlanExecutor::DEAD; -} - Timestamp PlanExecutor::getLatestOplogTimestamp() { if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) return static_cast(pipelineProxy)->getLatestOplogTimestamp(); diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index aeed2ea8154..3de96faf96b 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -52,13 +52,6 @@ class RecordId; struct PlanStageStats; class WorkingSet; -/** - * If true, when no results are available from a plan, then instead of returning immediately, the - * system should wait up to the length of the operation deadline for data to be inserted which - * causes results to become available. - */ -extern const OperationContext::Decoration shouldWaitForInserts; - /** * If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop * waiting for new data as soon as the committed op time changes. @@ -540,14 +533,6 @@ private: */ Status pickBestPlan(const Collection* collection); - /** - * Given a non-OK status returned from a yield 'yieldError', checks if this PlanExecutor - * represents a tailable, awaitData cursor and whether 'yieldError' is the error object - * describing an operation time out. If so, returns IS_EOF. Otherwise returns DEAD, and - * populates 'errorObj' with the error - if 'errorObj' is not null. - */ - ExecState swallowTimeoutIfAwaitData(Status yieldError, Snapshotted* errorObj) const; - // The OperationContext that we're executing within. This can be updated if necessary by using // detachFromOperationContext() and reattachToOperationContext(). OperationContext* _opCtx; -- cgit v1.2.1