summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/operation_context.cpp11
-rw-r--r--src/mongo/db/operation_context.h23
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp3
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp8
-rw-r--r--src/mongo/db/s/sharding_task_executor.cpp6
-rw-r--r--src/mongo/db/s/sharding_task_executor.h4
-rw-r--r--src/mongo/executor/task_executor.h10
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp31
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp24
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h4
-rw-r--r--src/mongo/s/query/cluster_find.cpp15
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp16
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.cpp3
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp6
-rw-r--r--src/mongo/unittest/task_executor_proxy.h4
15 files changed, 83 insertions, 85 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 75046690728..a647d0fcbd0 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -101,17 +101,6 @@ Microseconds OperationContext::computeMaxTimeFromDeadline(Date_t when) {
return maxTime;
}
-OperationContext::DeadlineStash::DeadlineStash(OperationContext* opCtx)
- : _opCtx(opCtx), _originalDeadline(_opCtx->getDeadline()) {
- _opCtx->_deadline = Date_t::max();
- _opCtx->_maxTime = _opCtx->computeMaxTimeFromDeadline(Date_t::max());
-}
-
-OperationContext::DeadlineStash::~DeadlineStash() {
- _opCtx->_deadline = _originalDeadline;
- _opCtx->_maxTime = _opCtx->computeMaxTimeFromDeadline(_originalDeadline);
-}
-
void OperationContext::setDeadlineByDate(Date_t when) {
setDeadlineAndMaxTime(when, computeMaxTimeFromDeadline(when));
}
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 915b8060981..e0228810713 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -84,28 +84,6 @@ public:
kFailedUnitOfWork // in a unit of work that has failed and must be aborted
};
- /**
- * An RAII type that will temporarily suspend any deadline on this operation. Resets the
- * deadline to the previous value upon destruction.
- */
- class DeadlineStash {
- public:
- /**
- * Clears any deadline set on this operation.
- */
- DeadlineStash(OperationContext* opCtx);
-
- /**
- * Resets the deadline on '_opCtx' to the original deadline present at the time this
- * DeadlineStash was constructed.
- */
- ~DeadlineStash();
-
- private:
- OperationContext* _opCtx;
- Date_t _originalDeadline;
- };
-
OperationContext(Client* client, unsigned int opId);
virtual ~OperationContext() = default;
@@ -464,7 +442,6 @@ private:
_writesAreReplicated = writesAreReplicated;
}
- friend class DeadlineStash;
friend class WriteUnitOfWork;
friend class repl::UnreplicatedWritesBlock;
Client* const _client;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index 89af387cf42..b0141c7988f 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -68,9 +68,6 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() {
return input;
}
- // Temporarily remove any deadline from this operation to avoid timeout during lookup.
- OperationContext::DeadlineStash deadlineStash(pExpCtx->opCtx);
-
MutableDocument output(input.releaseDocument());
output[kFullDocumentFieldName] = lookupPostImage(output.peek());
return output.freeze();
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 361542fd2db..d7098cfee77 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -84,13 +84,7 @@ void ExpressionContext::checkForInterrupt() {
if (--_interruptCounter == 0) {
invariant(opCtx);
_interruptCounter = kInterruptCheckPeriod;
- auto interruptStatus = opCtx->checkForInterruptNoAssert();
- if (interruptStatus == ErrorCodes::ExceededTimeLimit && isTailableAwaitData()) {
- // Don't respect deadline expiration during the pipeline when the cursor is
- // tailable and awaitdata.
- return;
- }
- uassertStatusOK(interruptStatus);
+ opCtx->checkForInterrupt();
}
}
diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp
index c671edc56e8..5ba9ac969d1 100644
--- a/src/mongo/db/s/sharding_task_executor.cpp
+++ b/src/mongo/db/s/sharding_task_executor.cpp
@@ -93,8 +93,10 @@ void ShardingTaskExecutor::waitForEvent(const EventHandle& event) {
_executor->waitForEvent(event);
}
-Status ShardingTaskExecutor::waitForEvent(OperationContext* opCtx, const EventHandle& event) {
- return _executor->waitForEvent(opCtx, event);
+StatusWith<stdx::cv_status> ShardingTaskExecutor::waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) {
+ return _executor->waitForEvent(opCtx, event, deadline);
}
StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork(
diff --git a/src/mongo/db/s/sharding_task_executor.h b/src/mongo/db/s/sharding_task_executor.h
index 37ac1b73213..4c2571c684a 100644
--- a/src/mongo/db/s/sharding_task_executor.h
+++ b/src/mongo/db/s/sharding_task_executor.h
@@ -62,7 +62,9 @@ public:
void signalEvent(const EventHandle& event) override;
StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
void waitForEvent(const EventHandle& event) override;
- Status waitForEvent(OperationContext* opCtx, const EventHandle& event) override;
+ StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) override;
StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 43a3ee81d9e..a36c9544ac3 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -39,6 +39,7 @@
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/platform/hash_namespace.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/time_support.h"
@@ -185,11 +186,12 @@ public:
virtual void waitForEvent(const EventHandle& event) = 0;
/**
- * Same as waitForEvent without an OperationContext, but returns an error if the event was not
- * triggered but the operation was killed - see OperationContext::checkForInterruptNoAssert()
- * for expected error codes.
+ * Same as waitForEvent without an OperationContext, but returns Status::OK with
+ * cv_status::timeout if the event was not triggered within deadline.
*/
- virtual Status waitForEvent(OperationContext* opCtx, const EventHandle& event) = 0;
+ virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) = 0;
/**
* Schedules "work" to be run by the executor ASAP.
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index 2185c221cf0..acc95e57754 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -350,10 +350,35 @@ COMMON_EXECUTOR_TEST(EventWaitingWithTimeoutTest) {
auto client = serviceContext->makeClient("for testing");
auto opCtx = client->makeOperationContext();
- opCtx->setDeadlineAfterNowBy(Milliseconds{1});
+ auto deadline = mockClock->now() + Milliseconds{1};
mockClock->advance(Milliseconds(2));
- ASSERT_EQ(ErrorCodes::ExceededTimeLimit,
- executor.waitForEvent(opCtx.get(), eventThatWillNeverBeTriggered));
+ ASSERT(stdx::cv_status::timeout ==
+ executor.waitForEvent(opCtx.get(), eventThatWillNeverBeTriggered, deadline));
+ executor.shutdown();
+ joinExecutorThread();
+}
+
+COMMON_EXECUTOR_TEST(EventSignalWithTimeoutTest) {
+ TaskExecutor& executor = getExecutor();
+ launchExecutorThread();
+
+ auto eventSignalled = unittest::assertGet(executor.makeEvent());
+
+ auto serviceContext = getGlobalServiceContext();
+
+ serviceContext->setFastClockSource(stdx::make_unique<ClockSourceMock>());
+ auto mockClock = static_cast<ClockSourceMock*>(serviceContext->getFastClockSource());
+
+ auto client = serviceContext->makeClient("for testing");
+ auto opCtx = client->makeOperationContext();
+
+ auto deadline = mockClock->now() + Milliseconds{1};
+ mockClock->advance(Milliseconds(1));
+
+ executor.signalEvent(eventSignalled);
+
+ ASSERT(stdx::cv_status::no_timeout ==
+ executor.waitForEvent(opCtx.get(), eventSignalled, deadline));
executor.shutdown();
joinExecutorThread();
}
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 4e6dc5f45fb..202de888b5a 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -285,22 +285,26 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E
return cbHandle;
}
-Status ThreadPoolTaskExecutor::waitForEvent(OperationContext* opCtx, const EventHandle& event) {
+StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) {
invariant(opCtx);
invariant(event.isValid());
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
stdx::unique_lock<stdx::mutex> lk(_mutex);
- try {
- // std::condition_variable::wait() can wake up spuriously, so provide a callback to detect
- // when that happens and go back to waiting.
- opCtx->waitForConditionOrInterrupt(eventState->isSignaledCondition, lk, [&eventState]() {
- return eventState->isSignaledFlag;
- });
- } catch (const DBException& e) {
- return e.toStatus();
+ // std::condition_variable::wait() can wake up spuriously, so we have to loop until the event
+ // is signalled or we time out.
+ while (!eventState->isSignaledFlag) {
+ auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(
+ eventState->isSignaledCondition, lk, deadline);
+
+ if (!status.isOK() || stdx::cv_status::timeout == status) {
+ return status;
+ }
}
- return Status::OK();
+
+ return stdx::cv_status::no_timeout;
}
void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index b443061945e..8e81e3a7f07 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -73,7 +73,9 @@ public:
StatusWith<EventHandle> makeEvent() override;
void signalEvent(const EventHandle& event) override;
StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
- Status waitForEvent(OperationContext* opCtx, const EventHandle& event) override;
+ StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) override;
void waitForEvent(const EventHandle& event) override;
StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index f8e7a69ed33..a2b5b105ab5 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -415,13 +415,16 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
ReadPreferenceSetting::get(opCtx) = *readPref;
}
if (pinnedCursor.getValue().isTailableAndAwaitData()) {
- // Default to 1-second timeout for tailable awaitData cursors. If an explicit maxTimeMS has
- // been specified, do not apply it to the opCtx, since its deadline will already have been
- // set during command processing.
+ // A maxTimeMS specified on a tailable, awaitData cursor is special. Instead of imposing a
+ // deadline on the operation, it is used to communicate how long the server should wait for
+ // new results. Here we clear any deadline set during command processing and track the
+ // deadline instead via the 'waitForInsertsDeadline' decoration. This deadline defaults to
+ // 1 second if the user didn't specify a maxTimeMS.
+ opCtx->clearDeadline();
auto timeout = request.awaitDataTimeout.value_or(Milliseconds{1000});
- if (!request.awaitDataTimeout) {
- opCtx->setDeadlineAfterNowBy(timeout);
- }
+ awaitDataState(opCtx).waitForInsertsDeadline =
+ opCtx->getServiceContext()->getPreciseClockSource()->now() + timeout;
+
invariant(pinnedCursor.getValue().setAwaitDataTimeout(timeout).isOK());
} else if (request.awaitDataTimeout) {
return {ErrorCodes::BadValue,
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index e571ed4ed2e..31c200bb004 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -78,18 +78,18 @@ StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContex
auto event = nextEventStatus.getValue();
// Block until there are further results to return, or our time limit is exceeded.
- auto waitStatus = _executor->waitForEvent(getOpCtx(), event);
+ auto waitStatus = _executor->waitForEvent(
+ getOpCtx(), event, awaitDataState(getOpCtx()).waitForInsertsDeadline);
- // Swallow ExceededTimeLimit errors for tailable awaitData cursors, stash the event
- // that we were waiting on, and return EOF.
- if (waitStatus == ErrorCodes::ExceededTimeLimit) {
+ if (!waitStatus.isOK()) {
+ return waitStatus.getStatus();
+ }
+ // Swallow timeout errors for tailable awaitData cursors, stash the event that we were
+ // waiting on, and return EOF.
+ if (waitStatus == stdx::cv_status::timeout) {
_leftoverEventFromLastTimeout = std::move(event);
return ClusterQueryResult{};
}
-
- if (!waitStatus.isOK()) {
- return waitStatus;
- }
}
// We reach this point either if the ARM is ready, or if the ARM is !ready and we are in
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
index eef17f80a92..3d9ddb3428c 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
@@ -86,9 +86,6 @@ std::vector<ClusterClientCursorParams::RemoteCursor>
RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds,
const BSONObj& newShardDetectedObj) {
auto* opCtx = getOpCtx();
- // Temporarily remove any deadline from this operation to avoid timing out while creating new
- // cursors.
- OperationContext::DeadlineStash deadlineStash(opCtx);
// Reload the shard registry. We need to ensure a reload initiated after calling this method
// caused the reload, otherwise we aren't guaranteed to get all the new shards.
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
index 189b68f4b25..03a9c90246e 100644
--- a/src/mongo/unittest/task_executor_proxy.cpp
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -82,8 +82,10 @@ void TaskExecutorProxy::waitForEvent(const EventHandle& event) {
_executor->waitForEvent(event);
}
-Status TaskExecutorProxy::waitForEvent(OperationContext* opCtx, const EventHandle& event) {
- return _executor->waitForEvent(opCtx, event);
+StatusWith<stdx::cv_status> TaskExecutorProxy::waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) {
+ return _executor->waitForEvent(opCtx, event, deadline);
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWork(
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
index 3607d5fa7df..fdcbc9c71d4 100644
--- a/src/mongo/unittest/task_executor_proxy.h
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -60,7 +60,9 @@ public:
virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
const CallbackFn& work) override;
virtual void waitForEvent(const EventHandle& event) override;
- virtual Status waitForEvent(OperationContext* opCtx, const EventHandle& event) override;
+ virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) override;
virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
virtual StatusWith<CallbackHandle> scheduleRemoteCommand(