diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-11-08 17:45:54 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-11-08 17:45:54 +0000 |
commit | 10a794c9d2fede2a0db9cf80e95f19d56e931196 (patch) | |
tree | d958e38ed4a14d992029548107a17e726ff20402 /cpp/src | |
parent | 36af925c0645e87661df4ba8448244b69492a0c1 (diff) | |
download | qpid-python-10a794c9d2fede2a0db9cf80e95f19d56e931196.tar.gz |
Improved Fix for the race condition where you've got a competing read and write
for the same handle. This can happen if we've just got a read event then before
handling it we watch for write events and get one immediately.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 67 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.h | 22 |
2 files changed, 15 insertions, 74 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 6218840cfa..7e47ce3d3a 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -46,7 +46,7 @@ void Dispatcher::run() { //TODO: this is a temporary fix to ensure that if two //events are being processed concurrently, the first thread //will call dispatchCallbacks serially for each one - h->handle(event.type); + h->dispatchCallbacks(event.type); } else { // Handle shutdown switch (event.type) { @@ -363,6 +363,18 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { case ACTIVE_RW: state = DELAYED_RW; break; + // Can only get here in a DELAYED_* state in the rare case + // that we're already here for reading and we get activated for + // writing and we can write (it might be possible the other way + // round too). In this case we're already processing the handle + // in a different thread in this function so return right away + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + case DELAYED_IDLE: + case DELAYED_DELETE: + return; default: assert(false); } @@ -426,58 +438,7 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { break; } } - //TODO: this is a temporary fix to mark the handle as deleted, - //but delay deletion until the handle loop below ends - deleted = true; -} - -/** - * TODO: The following are part of a temporary fix to ensure that - * where a new event is generated for the same handle while an - * earlier one is still being processed (due to an interest in - * writeability being declared) the events are processed serially - * by the first thread. - */ -void DispatchHandle::handle(Poller::EventType type) -{ - if (start(type)) { - dispatchCallbacks(type); - drain(); - if (deleted) delete this; - } -} - -bool DispatchHandle::start(Poller::EventType type) -{ - Mutex::ScopedLock l(processLock); - if (processing) { - events.push(type); - return false; - } else { - processing = true; - return true; - } -} - -void DispatchHandle::drain() -{ - Poller::EventType type; - while (next(type)) { - dispatchCallbacks(type); - } -} - -bool DispatchHandle::next(Poller::EventType& type) -{ - Mutex::ScopedLock l(processLock); - if (events.empty()) { - processing = false; - return false; - } else { - type = events.front(); - events.pop(); - return true; - } + delete this; } }} diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h index 0cfaf8b5d4..7cc4873068 100644 --- a/cpp/src/qpid/sys/Dispatcher.h +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -54,33 +54,13 @@ private: DELAYED_DELETE } state; - /** - * TODO: The following are part of a temporary fix to ensure that - * where a new event is generated for the same handle while an - * earlier one is still being processed (due to an interest in - * writeability being declared) the events are processed serially - * by the first thread. - */ - Mutex processLock; - bool processing; - bool deleted; - std::queue<Poller::EventType> events; - - bool start(Poller::EventType type); - void handle(Poller::EventType type); - void drain(); - bool next(Poller::EventType& type); - /**************************************************************/ - public: DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) : PollerHandle(s), readableCallback(rCb), writableCallback(wCb), disconnectedCallback(dCb), - state(IDLE), - processing(false), - deleted(false) + state(IDLE) {} ~DispatchHandle(); |