summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-04-27 18:32:16 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-28 16:45:49 +0000
commitbbc0199f6084ac56dc5ff28f41a277d0d678540b (patch)
treefc4f98fe28ae707e0fd97581df0169de2068290e /src/mongo/executor
parent417e36920af24736f7ebdd107d9b3258290b5393 (diff)
downloadmongo-bbc0199f6084ac56dc5ff28f41a277d0d678540b.tar.gz
SERVER-47815 Ensure all exhaust tasks are erased from task executor queue
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp27
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h3
2 files changed, 20 insertions, 10 deletions
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 3db33453800..765c4f459a7 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -765,12 +765,13 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
stdx::unique_lock<Latch> lk) {
_poolInProgressQueue.push_back(cbState);
cbState->exhaustIter = --_poolInProgressQueue.end();
+ auto expectedExhaustIter = cbState->exhaustIter.get();
lk.unlock();
if (cbState->baton) {
- cbState->baton->schedule([this, cbState](Status status) {
+ cbState->baton->schedule([this, cbState, expectedExhaustIter](Status status) {
if (status.isOK()) {
- runCallbackExhaust(cbState);
+ runCallbackExhaust(cbState, expectedExhaustIter);
return;
}
@@ -779,14 +780,14 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
cbState->canceled.store(1);
}
- _pool->schedule([this, cbState](auto status) {
+ _pool->schedule([this, cbState, expectedExhaustIter](auto status) {
invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
- runCallbackExhaust(cbState);
+ runCallbackExhaust(cbState, expectedExhaustIter);
});
});
} else {
- _pool->schedule([this, cbState](auto status) {
+ _pool->schedule([this, cbState, expectedExhaustIter](auto status) {
if (ErrorCodes::isCancelationError(status.code())) {
stdx::lock_guard<Latch> lk(_mutex);
@@ -795,14 +796,15 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
fassert(4615617, status);
}
- runCallbackExhaust(cbState);
+ runCallbackExhaust(cbState, expectedExhaustIter);
});
}
_net->signalWorkAvailable();
}
-void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) {
+void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState,
+ WorkQueue::iterator expectedExhaustIter) {
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, cbState);
CallbackArgs args(this,
@@ -827,9 +829,16 @@ void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> c
// handled in 'runCallback'.
stdx::lock_guard<Latch> lk(_mutex);
+ // It is possible that we receive multiple responses in quick succession. If this happens, the
+ // later responses can overwrite the 'exhaustIter' value on the cbState when adding the cbState
+ // to the '_poolInProgressQueue' if the previous responses have not been run yet. We take in the
+ // 'expectedExhaustIter' so that we can still remove this task from the 'poolInProgressQueue' if
+ // this happens, but we do not want to reset the 'exhaustIter' value in this case.
if (cbState->exhaustIter) {
- _poolInProgressQueue.erase(cbState->exhaustIter.get());
- cbState->exhaustIter = boost::none;
+ _poolInProgressQueue.erase(expectedExhaustIter);
+ if (cbState->exhaustIter.get() == expectedExhaustIter) {
+ cbState->exhaustIter = boost::none;
+ }
}
if (_inShutdown_inlock() && _poolInProgressQueue.empty()) {
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 4fbd38b2ba6..16230554948 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -198,7 +198,8 @@ private:
/**
* Executes the callback specified by "cbState". Will not mark cbState as finished.
*/
- void runCallbackExhaust(std::shared_ptr<CallbackState> cbState);
+ void runCallbackExhaust(std::shared_ptr<CallbackState> cbState,
+ WorkQueue::iterator expectedExhaustIter);
bool _inShutdown_inlock() const;
void _setState_inlock(State newState);