diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-01-18 12:04:01 -0500 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-01-31 15:05:08 -0500 |
commit | 15a7ac9ca54f2d580e2b1d1ab01fe095be1233db (patch) | |
tree | c3628b0d5be60aa6bd911c8c351daa9606f6b060 /src | |
parent | 3c349c50d8d5a55fa80c1d7ae3ac6a6f6cc82b5e (diff) | |
download | mongo-15a7ac9ca54f2d580e2b1d1ab01fe095be1233db.tar.gz |
SERVER-31484 separate the operation deadline from awaitData deadline in sharded queries.
The deadline has been been already separated for non-sharded queries.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/operation_context.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 23 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.cpp | 31 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 24 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 15 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_merge.cpp | 16 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_update_on_add_shard.cpp | 3 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.cpp | 6 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.h | 4 |
15 files changed, 83 insertions, 85 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 75046690728..a647d0fcbd0 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -101,17 +101,6 @@ Microseconds OperationContext::computeMaxTimeFromDeadline(Date_t when) { return maxTime; } -OperationContext::DeadlineStash::DeadlineStash(OperationContext* opCtx) - : _opCtx(opCtx), _originalDeadline(_opCtx->getDeadline()) { - _opCtx->_deadline = Date_t::max(); - _opCtx->_maxTime = _opCtx->computeMaxTimeFromDeadline(Date_t::max()); -} - -OperationContext::DeadlineStash::~DeadlineStash() { - _opCtx->_deadline = _originalDeadline; - _opCtx->_maxTime = _opCtx->computeMaxTimeFromDeadline(_originalDeadline); -} - void OperationContext::setDeadlineByDate(Date_t when) { setDeadlineAndMaxTime(when, computeMaxTimeFromDeadline(when)); } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 915b8060981..e0228810713 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -84,28 +84,6 @@ public: kFailedUnitOfWork // in a unit of work that has failed and must be aborted }; - /** - * An RAII type that will temporarily suspend any deadline on this operation. Resets the - * deadline to the previous value upon destruction. - */ - class DeadlineStash { - public: - /** - * Clears any deadline set on this operation. - */ - DeadlineStash(OperationContext* opCtx); - - /** - * Resets the deadline on '_opCtx' to the original deadline present at the time this - * DeadlineStash was constructed. - */ - ~DeadlineStash(); - - private: - OperationContext* _opCtx; - Date_t _originalDeadline; - }; - OperationContext(Client* client, unsigned int opId); virtual ~OperationContext() = default; @@ -464,7 +442,6 @@ private: _writesAreReplicated = writesAreReplicated; } - friend class DeadlineStash; friend class WriteUnitOfWork; friend class repl::UnreplicatedWritesBlock; Client* const _client; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index 89af387cf42..b0141c7988f 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -68,9 +68,6 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() { return input; } - // Temporarily remove any deadline from this operation to avoid timeout during lookup. - OperationContext::DeadlineStash deadlineStash(pExpCtx->opCtx); - MutableDocument output(input.releaseDocument()); output[kFullDocumentFieldName] = lookupPostImage(output.peek()); return output.freeze(); diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 361542fd2db..d7098cfee77 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -84,13 +84,7 @@ void ExpressionContext::checkForInterrupt() { if (--_interruptCounter == 0) { invariant(opCtx); _interruptCounter = kInterruptCheckPeriod; - auto interruptStatus = opCtx->checkForInterruptNoAssert(); - if (interruptStatus == ErrorCodes::ExceededTimeLimit && isTailableAwaitData()) { - // Don't respect deadline expiration during the pipeline when the cursor is - // tailable and awaitdata. - return; - } - uassertStatusOK(interruptStatus); + opCtx->checkForInterrupt(); } } diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp index c671edc56e8..5ba9ac969d1 100644 --- a/src/mongo/db/s/sharding_task_executor.cpp +++ b/src/mongo/db/s/sharding_task_executor.cpp @@ -93,8 +93,10 @@ void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { _executor->waitForEvent(event); } -Status ShardingTaskExecutor::waitForEvent(OperationContext* opCtx, const EventHandle& event) { - return _executor->waitForEvent(opCtx, event); +StatusWith<stdx::cv_status> ShardingTaskExecutor::waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) { + return _executor->waitForEvent(opCtx, event, deadline); } StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork( diff --git a/src/mongo/db/s/sharding_task_executor.h b/src/mongo/db/s/sharding_task_executor.h index 37ac1b73213..4c2571c684a 100644 --- a/src/mongo/db/s/sharding_task_executor.h +++ b/src/mongo/db/s/sharding_task_executor.h @@ -62,7 +62,9 @@ public: void signalEvent(const EventHandle& event) override; StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; void waitForEvent(const EventHandle& event) override; - Status waitForEvent(OperationContext* opCtx, const EventHandle& event) override; + StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) override; StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 43a3ee81d9e..a36c9544ac3 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -39,6 +39,7 @@ #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/platform/hash_namespace.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/util/time_support.h" @@ -185,11 +186,12 @@ public: virtual void waitForEvent(const EventHandle& event) = 0; /** - * Same as waitForEvent without an OperationContext, but returns an error if the event was not - * triggered but the operation was killed - see OperationContext::checkForInterruptNoAssert() - * for expected error codes. + * Same as waitForEvent without an OperationContext, but returns Status::OK with + * cv_status::timeout if the event was not triggered within deadline. */ - virtual Status waitForEvent(OperationContext* opCtx, const EventHandle& event) = 0; + virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) = 0; /** * Schedules "work" to be run by the executor ASAP. diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index 2185c221cf0..acc95e57754 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -350,10 +350,35 @@ COMMON_EXECUTOR_TEST(EventWaitingWithTimeoutTest) { auto client = serviceContext->makeClient("for testing"); auto opCtx = client->makeOperationContext(); - opCtx->setDeadlineAfterNowBy(Milliseconds{1}); + auto deadline = mockClock->now() + Milliseconds{1}; mockClock->advance(Milliseconds(2)); - ASSERT_EQ(ErrorCodes::ExceededTimeLimit, - executor.waitForEvent(opCtx.get(), eventThatWillNeverBeTriggered)); + ASSERT(stdx::cv_status::timeout == + executor.waitForEvent(opCtx.get(), eventThatWillNeverBeTriggered, deadline)); + executor.shutdown(); + joinExecutorThread(); +} + +COMMON_EXECUTOR_TEST(EventSignalWithTimeoutTest) { + TaskExecutor& executor = getExecutor(); + launchExecutorThread(); + + auto eventSignalled = unittest::assertGet(executor.makeEvent()); + + auto serviceContext = getGlobalServiceContext(); + + serviceContext->setFastClockSource(stdx::make_unique<ClockSourceMock>()); + auto mockClock = static_cast<ClockSourceMock*>(serviceContext->getFastClockSource()); + + auto client = serviceContext->makeClient("for testing"); + auto opCtx = client->makeOperationContext(); + + auto deadline = mockClock->now() + Milliseconds{1}; + mockClock->advance(Milliseconds(1)); + + executor.signalEvent(eventSignalled); + + ASSERT(stdx::cv_status::no_timeout == + executor.waitForEvent(opCtx.get(), eventSignalled, deadline)); executor.shutdown(); joinExecutorThread(); } diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 4e6dc5f45fb..202de888b5a 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -285,22 +285,26 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E return cbHandle; } -Status ThreadPoolTaskExecutor::waitForEvent(OperationContext* opCtx, const EventHandle& event) { +StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) { invariant(opCtx); invariant(event.isValid()); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); stdx::unique_lock<stdx::mutex> lk(_mutex); - try { - // std::condition_variable::wait() can wake up spuriously, so provide a callback to detect - // when that happens and go back to waiting. - opCtx->waitForConditionOrInterrupt(eventState->isSignaledCondition, lk, [&eventState]() { - return eventState->isSignaledFlag; - }); - } catch (const DBException& e) { - return e.toStatus(); + // std::condition_variable::wait() can wake up spuriously, so we have to loop until the event + // is signalled or we time out. + while (!eventState->isSignaledFlag) { + auto status = opCtx->waitForConditionOrInterruptNoAssertUntil( + eventState->isSignaledCondition, lk, deadline); + + if (!status.isOK() || stdx::cv_status::timeout == status) { + return status; + } } - return Status::OK(); + + return stdx::cv_status::no_timeout; } void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index b443061945e..8e81e3a7f07 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -73,7 +73,9 @@ public: StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; - Status waitForEvent(OperationContext* opCtx, const EventHandle& event) override; + StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) override; void waitForEvent(const EventHandle& event) override; StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index f8e7a69ed33..a2b5b105ab5 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -415,13 +415,16 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, ReadPreferenceSetting::get(opCtx) = *readPref; } if (pinnedCursor.getValue().isTailableAndAwaitData()) { - // Default to 1-second timeout for tailable awaitData cursors. If an explicit maxTimeMS has - // been specified, do not apply it to the opCtx, since its deadline will already have been - // set during command processing. + // A maxTimeMS specified on a tailable, awaitData cursor is special. Instead of imposing a + // deadline on the operation, it is used to communicate how long the server should wait for + // new results. Here we clear any deadline set during command processing and track the + // deadline instead via the 'waitForInsertsDeadline' decoration. This deadline defaults to + // 1 second if the user didn't specify a maxTimeMS. + opCtx->clearDeadline(); auto timeout = request.awaitDataTimeout.value_or(Milliseconds{1000}); - if (!request.awaitDataTimeout) { - opCtx->setDeadlineAfterNowBy(timeout); - } + awaitDataState(opCtx).waitForInsertsDeadline = + opCtx->getServiceContext()->getPreciseClockSource()->now() + timeout; + invariant(pinnedCursor.getValue().setAwaitDataTimeout(timeout).isOK()); } else if (request.awaitDataTimeout) { return {ErrorCodes::BadValue, diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index e571ed4ed2e..31c200bb004 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -78,18 +78,18 @@ StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContex auto event = nextEventStatus.getValue(); // Block until there are further results to return, or our time limit is exceeded. - auto waitStatus = _executor->waitForEvent(getOpCtx(), event); + auto waitStatus = _executor->waitForEvent( + getOpCtx(), event, awaitDataState(getOpCtx()).waitForInsertsDeadline); - // Swallow ExceededTimeLimit errors for tailable awaitData cursors, stash the event - // that we were waiting on, and return EOF. - if (waitStatus == ErrorCodes::ExceededTimeLimit) { + if (!waitStatus.isOK()) { + return waitStatus.getStatus(); + } + // Swallow timeout errors for tailable awaitData cursors, stash the event that we were + // waiting on, and return EOF. + if (waitStatus == stdx::cv_status::timeout) { _leftoverEventFromLastTimeout = std::move(event); return ClusterQueryResult{}; } - - if (!waitStatus.isOK()) { - return waitStatus; - } } // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp index eef17f80a92..3d9ddb3428c 100644 --- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp +++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp @@ -86,9 +86,6 @@ std::vector<ClusterClientCursorParams::RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds, const BSONObj& newShardDetectedObj) { auto* opCtx = getOpCtx(); - // Temporarily remove any deadline from this operation to avoid timing out while creating new - // cursors. - OperationContext::DeadlineStash deadlineStash(opCtx); // Reload the shard registry. We need to ensure a reload initiated after calling this method // caused the reload, otherwise we aren't guaranteed to get all the new shards. auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index 189b68f4b25..03a9c90246e 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -82,8 +82,10 @@ void TaskExecutorProxy::waitForEvent(const EventHandle& event) { _executor->waitForEvent(event); } -Status TaskExecutorProxy::waitForEvent(OperationContext* opCtx, const EventHandle& event) { - return _executor->waitForEvent(opCtx, event); +StatusWith<stdx::cv_status> TaskExecutorProxy::waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) { + return _executor->waitForEvent(opCtx, event, deadline); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWork( diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index 3607d5fa7df..fdcbc9c71d4 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -60,7 +60,9 @@ public: virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; virtual void waitForEvent(const EventHandle& event) override; - virtual Status waitForEvent(OperationContext* opCtx, const EventHandle& event) override; + virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) override; virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; virtual StatusWith<CallbackHandle> scheduleRemoteCommand( |