diff options
author | Gordon Sim <gsim@apache.org> | 2007-11-06 16:45:30 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-11-06 16:45:30 +0000 |
commit | a1a0ecfbf02293cf917db5e56d65d367be5ad5a7 (patch) | |
tree | 7be501ee181be2314d91bdaf4b3340f24cd201f2 /cpp/src | |
parent | 2eb4318357d07ec7dea9b034de6f14fe792ebe7b (diff) | |
download | qpid-python-a1a0ecfbf02293cf917db5e56d65d367be5ad5a7.tar.gz |
Temporary fix to issue that results in an assertion from Dispatcher.cpp. Where an interest in write is signalled just as a readable event is triggered it is possible for a writeable (or read-writeable) event to be triggered before the earlier event is processed. This change ensures they are processed serially by queueing them up for the first thread to handle.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 60 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.h | 23 |
2 files changed, 79 insertions, 4 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index d49af9d079..6218840cfa 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -43,7 +43,10 @@ void Dispatcher::run() { // If can read/write then dispatch appropriate callbacks if (h) { - h->dispatchCallbacks(event.type); + //TODO: this is a temporary fix to ensure that if two + //events are being processed concurrently, the first thread + //will call dispatchCallbacks serially for each one + h->handle(event.type); } else { // Handle shutdown switch (event.type) { @@ -422,8 +425,59 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { case DELAYED_DELETE: break; } - } - delete this; + } + //TODO: this is a temporary fix to mark the handle as deleted, + //but delay deletion until the handle loop below ends + deleted = true; +} + +/** + * TODO: The following are part of a temporary fix to ensure that + * where a new event is generated for the same handle while an + * earlier one is still being processed (due to an interest in + * writeability being declared) the events are processed serially + * by the first thread. + */ +void DispatchHandle::handle(Poller::EventType type) +{ + if (start(type)) { + dispatchCallbacks(type); + drain(); + if (deleted) delete this; + } +} + +bool DispatchHandle::start(Poller::EventType type) +{ + Mutex::ScopedLock l(processLock); + if (processing) { + events.push(type); + return false; + } else { + processing = true; + return true; + } +} + +void DispatchHandle::drain() +{ + Poller::EventType type; + while (next(type)) { + dispatchCallbacks(type); + } +} + +bool DispatchHandle::next(Poller::EventType& type) +{ + Mutex::ScopedLock l(processLock); + if (events.empty()) { + processing = false; + return false; + } else { + type = events.front(); + events.pop(); + return true; + } } }} diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h index 2de026e141..0cfaf8b5d4 100644 --- a/cpp/src/qpid/sys/Dispatcher.h +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -27,6 +27,7 @@ #include "Mutex.h" #include <memory> +#include <queue> #include <boost/function.hpp> #include <assert.h> @@ -53,13 +54,33 @@ private: DELAYED_DELETE } state; + /** + * TODO: The following are part of a temporary fix to ensure that + * where a new event is generated for the same handle while an + * earlier one is still being processed (due to an interest in + * writeability being declared) the events are processed serially + * by the first thread. + */ + Mutex processLock; + bool processing; + bool deleted; + std::queue<Poller::EventType> events; + + bool start(Poller::EventType type); + void handle(Poller::EventType type); + void drain(); + bool next(Poller::EventType& type); + /**************************************************************/ + public: DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) : PollerHandle(s), readableCallback(rCb), writableCallback(wCb), disconnectedCallback(dCb), - state(IDLE) + state(IDLE), + processing(false), + deleted(false) {} ~DispatchHandle(); |