diff options
author | Spencer Jackson <spencer.jackson@mongodb.com> | 2017-12-14 14:32:18 -0500 |
---|---|---|
committer | Spencer Jackson <spencer.jackson@mongodb.com> | 2017-12-14 14:32:18 -0500 |
commit | 1f38fb202b9f8696cf28d39e674242e036c0b75c (patch) | |
tree | 249a9c923a97dd7e37d8898c3782bd27a6dd53a3 /src/mongo | |
parent | 47247293f18ea581954f6fcf4c0018b7828e3c3a (diff) | |
download | mongo-1f38fb202b9f8696cf28d39e674242e036c0b75c.tar.gz |
Revert "SERVER-31684 Fix unexpected "operation exceeded time limit" errors"
This reverts commit b79e5f04ffc79b5892f89c22b9e5f26a297b1185.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.h | 19 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/find_common.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/find_common.h | 20 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 15 |
10 files changed, 92 insertions, 68 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 6493d147605..5d8e64a9bdd 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -152,15 +152,33 @@ void CappedInsertNotifier::notifyAll() { _notifier.notify_all(); } -void CappedInsertNotifier::waitUntil(uint64_t prevVersion, Date_t deadline) const { - stdx::unique_lock<stdx::mutex> lk(_mutex); +void CappedInsertNotifier::_wait(stdx::unique_lock<stdx::mutex>& lk, + uint64_t prevVersion, + Microseconds timeout) const { while (!_dead && prevVersion == _version) { - if (stdx::cv_status::timeout == _notifier.wait_until(lk, deadline.toSystemTimePoint())) { + if (timeout == Microseconds::max()) { + _notifier.wait(lk); + } else if (stdx::cv_status::timeout == _notifier.wait_for(lk, timeout.toSystemDuration())) { return; } } } +void CappedInsertNotifier::wait(uint64_t prevVersion, Microseconds timeout) const { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _wait(lk, prevVersion, timeout); +} + +void CappedInsertNotifier::wait(Microseconds timeout) const { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _wait(lk, _version, timeout); +} + +void CappedInsertNotifier::wait() const { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _wait(lk, _version, Microseconds::max()); +} + void CappedInsertNotifier::kill() { stdx::lock_guard<stdx::mutex> lk(_mutex); _dead = true; diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 1e9ad8ff4cd..216bfc971af 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -112,12 +112,17 @@ public: void notifyAll(); /** - * Waits until 'deadline', or until notifyAll() is called to indicate that new + * Waits for 'timeout' microseconds, or until notifyAll() is called to indicate that new * data is available in the capped collection. * * NOTE: Waiting threads can be signaled by calling kill or notify* methods. */ - void waitUntil(uint64_t prevVersion, Date_t deadline) const; + void wait(Microseconds timeout) const; + + /** + * Same as above but also ensures that if the version has changed, it also returns. + */ + void wait(uint64_t prevVersion, Microseconds timeout) const; /** * Returns the version for use as an additional wake condition when used above. @@ -127,6 +132,11 @@ public: } /** + * Same as above but without a timeout. + */ + void wait() const; + + /** * Cancels the notifier if the collection is dropped/invalidated, and wakes all waiting. */ void kill(); @@ -137,6 +147,11 @@ public: bool isDead(); private: + // Helper for wait impls. + void _wait(stdx::unique_lock<stdx::mutex>& lk, + uint64_t prevVersion, + Microseconds timeout) const; + // Signalled when a successful insert is made into a capped collection. mutable stdx::condition_variable _notifier; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 4bab002396f..cc9416cf621 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -282,21 +282,19 @@ public: if (cursor->isReadCommitted()) uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + const bool hasOwnMaxTime = opCtx->hasDeadline(); + const bool disableAwaitDataFailpointActive = MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); - // We assume that cursors created through a DBDirectClient are always used from their // original OperationContext, so we do not need to move time to and from the cursor. - if (!opCtx->getClient()->isInDirectClient()) { + if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) { // There is no time limit set directly on this getMore command. If the cursor is // awaitData, then we supply a default time of one second. Otherwise we roll over // any leftover time from the maxTimeMS of the operation that spawned this cursor, // applying it to this getMore. if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { - opCtx->clearDeadline(); - awaitDataState(opCtx).waitForInsertsDeadline = - opCtx->getServiceContext()->getPreciseClockSource()->now() + - request.awaitDataTimeout.value_or(Seconds{1}); + opCtx->setDeadlineAfterNowBy(Seconds{1}); } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); } @@ -338,7 +336,7 @@ public: if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { if (request.lastKnownCommittedOpTime) clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); - awaitDataState(opCtx).shouldWaitForInserts = true; + shouldWaitForInserts(opCtx) = true; } Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); @@ -368,7 +366,12 @@ public: exec->saveState(); exec->detachFromOperationContext(); - cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); + // If maxTimeMS was set directly on the getMore rather than being rolled over + // from a previous find, then don't roll remaining micros over to the next + // getMore. + if (!hasOwnMaxTime) { + cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); + } cursor->incPos(numResults); } else { @@ -436,7 +439,7 @@ public: } // As soon as we get a result, this operation no longer waits. - awaitDataState(opCtx).shouldWaitForInserts = false; + shouldWaitForInserts(opCtx) = false; // Add result to output buffer. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); nextBatch->append(obj); diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 915b8060981..4d5f05ceb85 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -411,14 +411,6 @@ public: } /** - * Reset the deadline for this operation. - */ - void clearDeadline() { - _deadline = Date_t::max(); - _maxTime = computeMaxTimeFromDeadline(_deadline); - } - - /** * Returns the number of milliseconds remaining for this operation's time limit or * Milliseconds::max() if the operation has no time limit. */ diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index a25926f97e0..bba8a3dee72 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -34,7 +34,6 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/query/explain.h" -#include "mongo/db/query/find_common.h" #include "mongo/db/storage/storage_options.h" #include "mongo/util/scopeguard.h" @@ -105,7 +104,7 @@ void DocumentSourceCursor::loadBatch() { // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and // needs-merge case), batching will result in a wrong time. - if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || + if (shouldWaitForInserts(pExpCtx->opCtx) || (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 394f7fa552a..50db4cff04b 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -428,9 +428,7 @@ Message getMore(OperationContext* opCtx, // the total operation latency. curOp.pauseTimer(); Seconds timeout(1); - notifier->waitUntil(notifierVersion, - opCtx->getServiceContext()->getPreciseClockSource()->now() + - timeout); + notifier->wait(notifierVersion, timeout); notifier.reset(); curOp.resumeTimer(); diff --git a/src/mongo/db/query/find_common.cpp b/src/mongo/db/query/find_common.cpp index b59b6e2a699..fa2049b07a0 100644 --- a/src/mongo/db/query/find_common.cpp +++ b/src/mongo/db/query/find_common.cpp @@ -40,9 +40,6 @@ MONGO_FP_DECLARE(keepCursorPinnedDuringGetMore); MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd); -const OperationContext::Decoration<AwaitDataState> awaitDataState = - OperationContext::declareDecoration<AwaitDataState>(); - bool FindCommon::enoughForFirstBatch(const QueryRequest& qr, long long numDocs) { if (!qr.getEffectiveBatchSize()) { // We enforce a default batch size for the initial find if no batch size is specified. diff --git a/src/mongo/db/query/find_common.h b/src/mongo/db/query/find_common.h index 0e83e3cb546..657554a0d94 100644 --- a/src/mongo/db/query/find_common.h +++ b/src/mongo/db/query/find_common.h @@ -27,30 +27,10 @@ */ #include "mongo/bson/bsonobj.h" -#include "mongo/db/operation_context.h" #include "mongo/util/fail_point_service.h" namespace mongo { -/** - * The state associated with tailable cursors. - */ -struct AwaitDataState { - /** - * The deadline for how long we wait on the tail of capped collection before returning IS_EOF. - */ - Date_t waitForInsertsDeadline; - - /** - * If true, when no results are available from a plan, then instead of returning immediately, - * the system should wait up to the length of the operation deadline for data to be inserted - * which causes results to become available. - */ - bool shouldWaitForInserts; -}; - -extern const OperationContext::Decoration<AwaitDataState> awaitDataState; - class BSONObj; class QueryRequest; diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index b2619986a46..e1ae5988b3a 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -45,7 +45,6 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" -#include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/repl/replication_coordinator.h" @@ -63,6 +62,8 @@ using std::string; using std::unique_ptr; using std::vector; +const OperationContext::Decoration<bool> shouldWaitForInserts = + OperationContext::declareDecoration<bool>(); const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime = OperationContext::declareDecoration<repl::OpTime>(); @@ -423,9 +424,8 @@ bool PlanExecutor::shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() && - awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && - awaitDataState(_opCtx).waitForInsertsDeadline > - _opCtx->getServiceContext()->getPreciseClockSource()->now()) { + mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && + _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { // We expect awaitData cursors to be yielding. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); @@ -470,21 +470,15 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n auto opCtx = _opCtx; uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] { - const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); + const auto timeout = opCtx->getRemainingMaxTimeMicros(); + notifierData->notifier->wait(notifierData->lastEOFVersion, timeout); }); notifierData->lastEOFVersion = currentNotifierVersion; - if (yieldResult.isOK()) { // There may be more results, try to get more data. return ADVANCED; } - - if (errorObj) { - *errorObj = Snapshotted<BSONObj>(SnapshotId(), - WorkingSetCommon::buildMemberStatusObject(yieldResult)); - } - return DEAD; + return swallowTimeoutIfAwaitData(yieldResult, errorObj); } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { @@ -540,11 +534,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, if (_yieldPolicy->shouldYield()) { auto yieldStatus = _yieldPolicy->yield(fetcher.get()); if (!yieldStatus.isOK()) { - if (objOut) { - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus)); - } - return PlanExecutor::DEAD; + return swallowTimeoutIfAwaitData(yieldStatus, objOut); } } @@ -697,6 +687,23 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } +PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( + Status yieldError, Snapshotted<BSONObj>* errorObj) const { + if (yieldError == ErrorCodes::ExceededTimeLimit) { + if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) { + // If the cursor is tailable then exceeding the time limit should not destroy this + // PlanExecutor, we should just stop waiting for inserts. + return PlanExecutor::IS_EOF; + } + } + + if (errorObj) { + *errorObj = Snapshotted<BSONObj>(SnapshotId(), + WorkingSetCommon::buildMemberStatusObject(yieldError)); + } + return PlanExecutor::DEAD; +} + Timestamp PlanExecutor::getLatestOplogTimestamp() { if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 3de96faf96b..aeed2ea8154 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -53,6 +53,13 @@ struct PlanStageStats; class WorkingSet; /** + * If true, when no results are available from a plan, then instead of returning immediately, the + * system should wait up to the length of the operation deadline for data to be inserted which + * causes results to become available. + */ +extern const OperationContext::Decoration<bool> shouldWaitForInserts; + +/** * If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop * waiting for new data as soon as the committed op time changes. * @@ -533,6 +540,14 @@ private: */ Status pickBestPlan(const Collection* collection); + /** + * Given a non-OK status returned from a yield 'yieldError', checks if this PlanExecutor + * represents a tailable, awaitData cursor and whether 'yieldError' is the error object + * describing an operation time out. If so, returns IS_EOF. Otherwise returns DEAD, and + * populates 'errorObj' with the error - if 'errorObj' is not null. + */ + ExecState swallowTimeoutIfAwaitData(Status yieldError, Snapshotted<BSONObj>* errorObj) const; + // The OperationContext that we're executing within. This can be updated if necessary by using // detachFromOperationContext() and reattachToOperationContext(). OperationContext* _opCtx; |