From a1a0ecfbf02293cf917db5e56d65d367be5ad5a7 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 6 Nov 2007 16:45:30 +0000 Subject: Temporary fix to issue that results in an assertion from Dispatcher.cpp. Where an interest in write is signalled just as a readable event is triggered it is possible for a writeable (or read-writeable) event to be triggered before the earlier event is processed. This change ensures they are processed serially by queueing them up for the first thread to handle. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592485 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/Dispatcher.cpp | 60 ++++++++++++++++++++++++++++++++++++++--- cpp/src/qpid/sys/Dispatcher.h | 23 +++++++++++++++- 2 files changed, 79 insertions(+), 4 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index d49af9d079..6218840cfa 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -43,7 +43,10 @@ void Dispatcher::run() { // If can read/write then dispatch appropriate callbacks if (h) { - h->dispatchCallbacks(event.type); + //TODO: this is a temporary fix to ensure that if two + //events are being processed concurrently, the first thread + //will call dispatchCallbacks serially for each one + h->handle(event.type); } else { // Handle shutdown switch (event.type) { @@ -422,8 +425,59 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { case DELAYED_DELETE: break; } - } - delete this; + } + //TODO: this is a temporary fix to mark the handle as deleted, + //but delay deletion until the handle loop below ends + deleted = true; +} + +/** + * TODO: The following are part of a temporary fix to ensure that + * where a new event is generated for the same handle while an + * earlier one is still being processed (due to an interest in + * writeability being declared) the events are processed serially + * by the first thread. + */ +void DispatchHandle::handle(Poller::EventType type) +{ + if (start(type)) { + dispatchCallbacks(type); + drain(); + if (deleted) delete this; + } +} + +bool DispatchHandle::start(Poller::EventType type) +{ + Mutex::ScopedLock l(processLock); + if (processing) { + events.push(type); + return false; + } else { + processing = true; + return true; + } +} + +void DispatchHandle::drain() +{ + Poller::EventType type; + while (next(type)) { + dispatchCallbacks(type); + } +} + +bool DispatchHandle::next(Poller::EventType& type) +{ + Mutex::ScopedLock l(processLock); + if (events.empty()) { + processing = false; + return false; + } else { + type = events.front(); + events.pop(); + return true; + } } }} diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h index 2de026e141..0cfaf8b5d4 100644 --- a/cpp/src/qpid/sys/Dispatcher.h +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -27,6 +27,7 @@ #include "Mutex.h" #include +#include #include #include @@ -53,13 +54,33 @@ private: DELAYED_DELETE } state; + /** + * TODO: The following are part of a temporary fix to ensure that + * where a new event is generated for the same handle while an + * earlier one is still being processed (due to an interest in + * writeability being declared) the events are processed serially + * by the first thread. + */ + Mutex processLock; + bool processing; + bool deleted; + std::queue events; + + bool start(Poller::EventType type); + void handle(Poller::EventType type); + void drain(); + bool next(Poller::EventType& type); + /**************************************************************/ + public: DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) : PollerHandle(s), readableCallback(rCb), writableCallback(wCb), disconnectedCallback(dCb), - state(IDLE) + state(IDLE), + processing(false), + deleted(false) {} ~DispatchHandle(); -- cgit v1.2.1