diff options
author | jannaerin <golden.janna@gmail.com> | 2020-04-27 18:32:16 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-28 16:45:49 +0000 |
commit | bbc0199f6084ac56dc5ff28f41a277d0d678540b (patch) | |
tree | fc4f98fe28ae707e0fd97581df0169de2068290e | |
parent | 417e36920af24736f7ebdd107d9b3258290b5393 (diff) | |
download | mongo-bbc0199f6084ac56dc5ff28f41a277d0d678540b.tar.gz |
SERVER-47815 Ensure all exhaust tasks are erased from task executor queue
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 27 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 3 |
2 files changed, 20 insertions, 10 deletions
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 3db33453800..765c4f459a7 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -765,12 +765,13 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call stdx::unique_lock<Latch> lk) { _poolInProgressQueue.push_back(cbState); cbState->exhaustIter = --_poolInProgressQueue.end(); + auto expectedExhaustIter = cbState->exhaustIter.get(); lk.unlock(); if (cbState->baton) { - cbState->baton->schedule([this, cbState](Status status) { + cbState->baton->schedule([this, cbState, expectedExhaustIter](Status status) { if (status.isOK()) { - runCallbackExhaust(cbState); + runCallbackExhaust(cbState, expectedExhaustIter); return; } @@ -779,14 +780,14 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call cbState->canceled.store(1); } - _pool->schedule([this, cbState](auto status) { + _pool->schedule([this, cbState, expectedExhaustIter](auto status) { invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); - runCallbackExhaust(cbState); + runCallbackExhaust(cbState, expectedExhaustIter); }); }); } else { - _pool->schedule([this, cbState](auto status) { + _pool->schedule([this, cbState, expectedExhaustIter](auto status) { if (ErrorCodes::isCancelationError(status.code())) { stdx::lock_guard<Latch> lk(_mutex); @@ -795,14 +796,15 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call fassert(4615617, status); } - runCallbackExhaust(cbState); + runCallbackExhaust(cbState, expectedExhaustIter); }); } _net->signalWorkAvailable(); } -void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) { +void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState, + WorkQueue::iterator expectedExhaustIter) { CallbackHandle cbHandle; setCallbackForHandle(&cbHandle, cbState); CallbackArgs args(this, @@ -827,9 +829,16 @@ void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> c // handled in 'runCallback'. stdx::lock_guard<Latch> lk(_mutex); + // It is possible that we receive multiple responses in quick succession. If this happens, the + // later responses can overwrite the 'exhaustIter' value on the cbState when adding the cbState + // to the '_poolInProgressQueue' if the previous responses have not been run yet. We take in the + // 'expectedExhaustIter' so that we can still remove this task from the 'poolInProgressQueue' if + // this happens, but we do not want to reset the 'exhaustIter' value in this case. if (cbState->exhaustIter) { - _poolInProgressQueue.erase(cbState->exhaustIter.get()); - cbState->exhaustIter = boost::none; + _poolInProgressQueue.erase(expectedExhaustIter); + if (cbState->exhaustIter.get() == expectedExhaustIter) { + cbState->exhaustIter = boost::none; + } } if (_inShutdown_inlock() && _poolInProgressQueue.empty()) { diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 4fbd38b2ba6..16230554948 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -198,7 +198,8 @@ private: /** * Executes the callback specified by "cbState". Will not mark cbState as finished. */ - void runCallbackExhaust(std::shared_ptr<CallbackState> cbState); + void runCallbackExhaust(std::shared_ptr<CallbackState> cbState, + WorkQueue::iterator expectedExhaustIter); bool _inShutdown_inlock() const; void _setState_inlock(State newState); |