diff options
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 60 |
1 files changed, 57 insertions, 3 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index d49af9d079..6218840cfa 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -43,7 +43,10 @@ void Dispatcher::run() { // If can read/write then dispatch appropriate callbacks if (h) { - h->dispatchCallbacks(event.type); + //TODO: this is a temporary fix to ensure that if two + //events are being processed concurrently, the first thread + //will call dispatchCallbacks serially for each one + h->handle(event.type); } else { // Handle shutdown switch (event.type) { @@ -422,8 +425,59 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { case DELAYED_DELETE: break; } - } - delete this; + } + //TODO: this is a temporary fix to mark the handle as deleted, + //but delay deletion until the handle loop below ends + deleted = true; +} + +/** + * TODO: The following are part of a temporary fix to ensure that + * where a new event is generated for the same handle while an + * earlier one is still being processed (due to an interest in + * writeability being declared) the events are processed serially + * by the first thread. + */ +void DispatchHandle::handle(Poller::EventType type) +{ + if (start(type)) { + dispatchCallbacks(type); + drain(); + if (deleted) delete this; + } +} + +bool DispatchHandle::start(Poller::EventType type) +{ + Mutex::ScopedLock l(processLock); + if (processing) { + events.push(type); + return false; + } else { + processing = true; + return true; + } +} + +void DispatchHandle::drain() +{ + Poller::EventType type; + while (next(type)) { + dispatchCallbacks(type); + } +} + +bool DispatchHandle::next(Poller::EventType& type) +{ + Mutex::ScopedLock l(processLock); + if (events.empty()) { + processing = false; + return false; + } else { + type = events.front(); + events.pop(); + return true; + } } }} |