diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2017-12-14 14:25:52 -0500 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-01-03 11:23:31 -0500 |
commit | 8d2276dfa94673f0ca1480fc87977a6f36c2816a (patch) | |
tree | 5128dc4b44a8fc060516c1f8745d82cdf099fb00 | |
parent | 879db6231681408b5ca4bba8c49d2d5970986669 (diff) | |
download | mongo-8d2276dfa94673f0ca1480fc87977a6f36c2816a.tar.gz |
SERVER-31684 Fix unexpected "operation exceeded time limit" errors
The changestream queries used an operation context deadline to track
a wait time before returning EOF. This occasionaly interfered with
normal operation deadlines leading to unexpected errors.
(cherry picked from commit 962c5c61c93776aa4d1a8efb67a1a80cb3bb2ad0)
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.h | 19 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/find_common.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/find_common.h | 20 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 15 | ||||
-rw-r--r-- | src/mongo/dbtests/documentsourcetests.cpp | 6 | ||||
-rw-r--r-- | src/mongo/dbtests/query_plan_executor.cpp | 5 |
12 files changed, 74 insertions, 97 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 5d8e64a9bdd..6493d147605 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -152,33 +152,15 @@ void CappedInsertNotifier::notifyAll() { _notifier.notify_all(); } -void CappedInsertNotifier::_wait(stdx::unique_lock<stdx::mutex>& lk, - uint64_t prevVersion, - Microseconds timeout) const { +void CappedInsertNotifier::waitUntil(uint64_t prevVersion, Date_t deadline) const { + stdx::unique_lock<stdx::mutex> lk(_mutex); while (!_dead && prevVersion == _version) { - if (timeout == Microseconds::max()) { - _notifier.wait(lk); - } else if (stdx::cv_status::timeout == _notifier.wait_for(lk, timeout.toSystemDuration())) { + if (stdx::cv_status::timeout == _notifier.wait_until(lk, deadline.toSystemTimePoint())) { return; } } } -void CappedInsertNotifier::wait(uint64_t prevVersion, Microseconds timeout) const { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _wait(lk, prevVersion, timeout); -} - -void CappedInsertNotifier::wait(Microseconds timeout) const { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _wait(lk, _version, timeout); -} - -void CappedInsertNotifier::wait() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _wait(lk, _version, Microseconds::max()); -} - void CappedInsertNotifier::kill() { stdx::lock_guard<stdx::mutex> lk(_mutex); _dead = true; diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 216bfc971af..1e9ad8ff4cd 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -112,17 +112,12 @@ public: void notifyAll(); /** - * Waits for 'timeout' microseconds, or until notifyAll() is called to indicate that new + * Waits until 'deadline', or until notifyAll() is called to indicate that new * data is available in the capped collection. * * NOTE: Waiting threads can be signaled by calling kill or notify* methods. */ - void wait(Microseconds timeout) const; - - /** - * Same as above but also ensures that if the version has changed, it also returns. - */ - void wait(uint64_t prevVersion, Microseconds timeout) const; + void waitUntil(uint64_t prevVersion, Date_t deadline) const; /** * Returns the version for use as an additional wake condition when used above. @@ -132,11 +127,6 @@ public: } /** - * Same as above but without a timeout. - */ - void wait() const; - - /** * Cancels the notifier if the collection is dropped/invalidated, and wakes all waiting. */ void kill(); @@ -147,11 +137,6 @@ public: bool isDead(); private: - // Helper for wait impls. - void _wait(stdx::unique_lock<stdx::mutex>& lk, - uint64_t prevVersion, - Microseconds timeout) const; - // Signalled when a successful insert is made into a capped collection. mutable stdx::condition_variable _notifier; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index cc9416cf621..4bab002396f 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -282,19 +282,21 @@ public: if (cursor->isReadCommitted()) uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); - const bool hasOwnMaxTime = opCtx->hasDeadline(); - const bool disableAwaitDataFailpointActive = MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); + // We assume that cursors created through a DBDirectClient are always used from their // original OperationContext, so we do not need to move time to and from the cursor. - if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) { + if (!opCtx->getClient()->isInDirectClient()) { // There is no time limit set directly on this getMore command. If the cursor is // awaitData, then we supply a default time of one second. Otherwise we roll over // any leftover time from the maxTimeMS of the operation that spawned this cursor, // applying it to this getMore. if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { - opCtx->setDeadlineAfterNowBy(Seconds{1}); + opCtx->clearDeadline(); + awaitDataState(opCtx).waitForInsertsDeadline = + opCtx->getServiceContext()->getPreciseClockSource()->now() + + request.awaitDataTimeout.value_or(Seconds{1}); } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); } @@ -336,7 +338,7 @@ public: if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { if (request.lastKnownCommittedOpTime) clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); - shouldWaitForInserts(opCtx) = true; + awaitDataState(opCtx).shouldWaitForInserts = true; } Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); @@ -366,12 +368,7 @@ public: exec->saveState(); exec->detachFromOperationContext(); - // If maxTimeMS was set directly on the getMore rather than being rolled over - // from a previous find, then don't roll remaining micros over to the next - // getMore. - if (!hasOwnMaxTime) { - cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); - } + cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); cursor->incPos(numResults); } else { @@ -439,7 +436,7 @@ public: } // As soon as we get a result, this operation no longer waits. - shouldWaitForInserts(opCtx) = false; + awaitDataState(opCtx).shouldWaitForInserts = false; // Add result to output buffer. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); nextBatch->append(obj); diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 4d5f05ceb85..915b8060981 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -411,6 +411,14 @@ public: } /** + * Reset the deadline for this operation. + */ + void clearDeadline() { + _deadline = Date_t::max(); + _maxTime = computeMaxTimeFromDeadline(_deadline); + } + + /** * Returns the number of milliseconds remaining for this operation's time limit or * Milliseconds::max() if the operation has no time limit. */ diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index bba8a3dee72..a25926f97e0 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -34,6 +34,7 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/query/explain.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/storage/storage_options.h" #include "mongo/util/scopeguard.h" @@ -104,7 +105,7 @@ void DocumentSourceCursor::loadBatch() { // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and // needs-merge case), batching will result in a wrong time. - if (shouldWaitForInserts(pExpCtx->opCtx) || + if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 50db4cff04b..394f7fa552a 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -428,7 +428,9 @@ Message getMore(OperationContext* opCtx, // the total operation latency. curOp.pauseTimer(); Seconds timeout(1); - notifier->wait(notifierVersion, timeout); + notifier->waitUntil(notifierVersion, + opCtx->getServiceContext()->getPreciseClockSource()->now() + + timeout); notifier.reset(); curOp.resumeTimer(); diff --git a/src/mongo/db/query/find_common.cpp b/src/mongo/db/query/find_common.cpp index fa2049b07a0..b59b6e2a699 100644 --- a/src/mongo/db/query/find_common.cpp +++ b/src/mongo/db/query/find_common.cpp @@ -40,6 +40,9 @@ MONGO_FP_DECLARE(keepCursorPinnedDuringGetMore); MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd); +const OperationContext::Decoration<AwaitDataState> awaitDataState = + OperationContext::declareDecoration<AwaitDataState>(); + bool FindCommon::enoughForFirstBatch(const QueryRequest& qr, long long numDocs) { if (!qr.getEffectiveBatchSize()) { // We enforce a default batch size for the initial find if no batch size is specified. diff --git a/src/mongo/db/query/find_common.h b/src/mongo/db/query/find_common.h index 657554a0d94..0e83e3cb546 100644 --- a/src/mongo/db/query/find_common.h +++ b/src/mongo/db/query/find_common.h @@ -27,10 +27,30 @@ */ #include "mongo/bson/bsonobj.h" +#include "mongo/db/operation_context.h" #include "mongo/util/fail_point_service.h" namespace mongo { +/** + * The state associated with tailable cursors. + */ +struct AwaitDataState { + /** + * The deadline for how long we wait on the tail of capped collection before returning IS_EOF. + */ + Date_t waitForInsertsDeadline; + + /** + * If true, when no results are available from a plan, then instead of returning immediately, + * the system should wait up to the length of the operation deadline for data to be inserted + * which causes results to become available. + */ + bool shouldWaitForInserts; +}; + +extern const OperationContext::Decoration<AwaitDataState> awaitDataState; + class BSONObj; class QueryRequest; diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index e1ae5988b3a..b2619986a46 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -45,6 +45,7 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/repl/replication_coordinator.h" @@ -62,8 +63,6 @@ using std::string; using std::unique_ptr; using std::vector; -const OperationContext::Decoration<bool> shouldWaitForInserts = - OperationContext::declareDecoration<bool>(); const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime = OperationContext::declareDecoration<repl::OpTime>(); @@ -424,8 +423,9 @@ bool PlanExecutor::shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() && - mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && - _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { + awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && + awaitDataState(_opCtx).waitForInsertsDeadline > + _opCtx->getServiceContext()->getPreciseClockSource()->now()) { // We expect awaitData cursors to be yielding. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); @@ -470,15 +470,21 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n auto opCtx = _opCtx; uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { - const auto timeout = opCtx->getRemainingMaxTimeMicros(); - notifierData->notifier->wait(notifierData->lastEOFVersion, timeout); + const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; + notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); }); notifierData->lastEOFVersion = currentNotifierVersion; + if (yieldResult.isOK()) { // There may be more results, try to get more data. return ADVANCED; } - return swallowTimeoutIfAwaitData(yieldResult, errorObj); + + if (errorObj) { + *errorObj = Snapshotted<BSONObj>(SnapshotId(), + WorkingSetCommon::buildMemberStatusObject(yieldResult)); + } + return DEAD; } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { @@ -534,7 +540,11 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, if (_yieldPolicy->shouldYield()) { auto yieldStatus = _yieldPolicy->yield(fetcher.get()); if (!yieldStatus.isOK()) { - return swallowTimeoutIfAwaitData(yieldStatus, objOut); + if (objOut) { + *objOut = Snapshotted<BSONObj>( + SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus)); + } + return PlanExecutor::DEAD; } } @@ -687,23 +697,6 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } -PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( - Status yieldError, Snapshotted<BSONObj>* errorObj) const { - if (yieldError == ErrorCodes::ExceededTimeLimit) { - if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) { - // If the cursor is tailable then exceeding the time limit should not destroy this - // PlanExecutor, we should just stop waiting for inserts. - return PlanExecutor::IS_EOF; - } - } - - if (errorObj) { - *errorObj = Snapshotted<BSONObj>(SnapshotId(), - WorkingSetCommon::buildMemberStatusObject(yieldError)); - } - return PlanExecutor::DEAD; -} - Timestamp PlanExecutor::getLatestOplogTimestamp() { if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index aeed2ea8154..3de96faf96b 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -53,13 +53,6 @@ struct PlanStageStats; class WorkingSet; /** - * If true, when no results are available from a plan, then instead of returning immediately, the - * system should wait up to the length of the operation deadline for data to be inserted which - * causes results to become available. - */ -extern const OperationContext::Decoration<bool> shouldWaitForInserts; - -/** * If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop * waiting for new data as soon as the committed op time changes. * @@ -540,14 +533,6 @@ private: */ Status pickBestPlan(const Collection* collection); - /** - * Given a non-OK status returned from a yield 'yieldError', checks if this PlanExecutor - * represents a tailable, awaitData cursor and whether 'yieldError' is the error object - * describing an operation time out. If so, returns IS_EOF. Otherwise returns DEAD, and - * populates 'errorObj' with the error - if 'errorObj' is not null. - */ - ExecState swallowTimeoutIfAwaitData(Status yieldError, Snapshotted<BSONObj>* errorObj) const; - // The OperationContext that we're executing within. This can be updated if necessary by using // detachFromOperationContext() and reattachToOperationContext(). OperationContext* _opCtx; diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 97ddc5057df..db9ac3f47a6 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -311,7 +311,7 @@ TEST_F(DocumentSourceCursorTest, SerializationRespectsExplainModes) { source()->dispose(); } -TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout) { +TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout) { // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. const bool capped = true; const long long cappedSize = 1024; @@ -348,8 +348,8 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout) auto cursor = DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); - ASSERT(cursor->getNext().isEOF()); - cursor->dispose(); + ON_BLOCK_EXIT([cursor]() { cursor->dispose(); }); + ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled); } TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) { diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 99a45b8a402..8f8025e4b9d 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -296,7 +296,7 @@ TEST_F(PlanExecutorTest, ShouldReportErrorIfExceedsTimeLimitDuringYield) { ASSERT_EQ(ErrorCodes::ExceededTimeLimit, WorkingSetCommon::getMemberObjectStatus(resultObj)); } -TEST_F(PlanExecutorTest, ShouldReportEOFIfExceedsTimeLimitDuringYieldButIsTailableAndAwaitData) { +TEST_F(PlanExecutorTest, ShouldReportErrorIfKilledDuringYieldButIsTailableAndAwaitData) { OldClientWriteContext ctx(&_opCtx, nss.ns()); insert(BSON("_id" << 1)); insert(BSON("_id" << 2)); @@ -310,7 +310,8 @@ TEST_F(PlanExecutorTest, ShouldReportEOFIfExceedsTimeLimitDuringYieldButIsTailab TailableMode::kTailableAndAwaitData); BSONObj resultObj; - ASSERT_EQ(PlanExecutor::IS_EOF, exec->getNext(&resultObj, nullptr)); + ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, WorkingSetCommon::getMemberObjectStatus(resultObj)); } TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailableButNotAwaitData) { |