diff options
author | Alan Conway <aconway@apache.org> | 2006-12-21 00:58:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-12-21 00:58:59 +0000 |
commit | 27e78c73ab671a465c35f3cd405f9bcf3ca3dbec (patch) | |
tree | 2435c1586195824640adfe9656c93b64dc09a808 | |
parent | 10039f3f81807ba4e9cbbcf2ae000f72888a56a7 (diff) | |
download | qpid-python-27e78c73ab671a465c35f3cd405f9bcf3ca3dbec.tar.gz |
EventChannel.cpp: Simplified handling of wakeups for multiple events by
using epoll's level-triggered behavior to "put back" an event and let
it wake up later.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489237 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannel.cpp | 67 | ||||
-rwxr-xr-x | cpp/tests/run-system-tests | 2 | ||||
-rwxr-xr-x | cpp/tests/topictest | 4 |
3 files changed, 45 insertions, 28 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp index 26437484f8..79bf8030cb 100644 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -88,6 +88,10 @@ class EventChannel::Queue : private boost::noncopyable */ Event* wake(uint32_t epollFlags); + Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; } + + bool empty() { return queue.empty(); } + void setBit(uint32_t &epollFlags); void shutdown(); @@ -117,7 +121,7 @@ class EventChannel::Descriptor : private boost::noncopyable { void activate(int epollFd_, int myFd_); /** Epoll woke up for this descriptor. */ - EventPair wake(uint32_t epollEvents); + Event* wake(uint32_t epollEvents); /** Shut down: close and remove file descriptor. * May be re-activated if fd is reused. @@ -134,11 +138,13 @@ class EventChannel::Descriptor : private boost::noncopyable { private: void update(); void epollCtl(int op, uint32_t events); + Queue* pick(); Mutex lock; int epollFd; int myFd; Queue inQueue, outQueue; + bool preferIn; friend class Queue; }; @@ -205,6 +211,7 @@ void EventChannel::Queue::setBit(uint32_t &epollFlags) { epollFlags |= myEvent; } +// TODO aconway 2006-12-20: REMOVE Event* EventChannel::Queue::wake(uint32_t epollFlags) { // Called with lock held. if (!queue.empty() && (isMyEvent(epollFlags))) { @@ -259,6 +266,7 @@ void EventChannel::Descriptor::shutdownUnsafe() { outQueue.shutdown(); } +// TODO aconway 2006-12-20: Inline into wake(). void EventChannel::Descriptor::update() { // Caller holds lock. if (isShutdown()) // Nothing to do @@ -276,12 +284,26 @@ void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { ee.data.ptr = this; ee.events = events; int status = ::epoll_ctl(epollFd, op, myFd, &ee); - if (status < 0) - throw QPID_POSIX_ERROR(errno); + if (status < 0) { + if (errno == EBADF) // FD was closed externally. + shutdownUnsafe(); + else + throw QPID_POSIX_ERROR(errno); + } } -EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { +EventChannel::Queue* EventChannel::Descriptor::pick() { + if (inQueue.empty() && outQueue.empty()) + return 0; + if (inQueue.empty() || outQueue.empty()) + return !inQueue.empty() ? &inQueue : &outQueue; + // Neither is empty, pick fairly. + preferIn = !preferIn; + return preferIn ? &inQueue : &outQueue; +} + +Event* EventChannel::Descriptor::wake(uint32_t epollEvents) { Mutex::ScopedLock l(lock); // On error, shut down the Descriptor and both queues. if (epollEvents & (EPOLLERR | EPOLLHUP)) { @@ -291,14 +313,24 @@ EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { // exception on the events. Can we get more accurate error // reporting somehow? } - // Check the queues even if shutdown - we want to drain the - // events that have been marked with exceptions. - EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents)); + Queue*q = 0; + bool in = (epollEvents & EPOLLIN); + bool out = (epollEvents & EPOLLOUT); + if ((in && out) || isShutdown()) + q = pick(); // Choose fairly, either non-empty queue. + else if (in) + q = &inQueue; + else if (out) + q = &outQueue; + Event* e = (q && !q->empty()) ? q->pop() : 0; update(); - return ready; + if (e) + e->complete(*this); + return e; } + // ================================================================ // EventChannel::Impl @@ -405,11 +437,10 @@ Event* EventChannel::Impl::wait(Time timeoutNs) (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; CleanStruct<epoll_event> ee; Event* event = 0; - bool doSwap = true; // Loop till we get a completed event. Some events may repost // themselves and return 0, e.g. incomplete read or write events. - // + //TODO aconway 2006-12-20: FIX THIS! while (!event) { int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. if (n == 0) // Timeout @@ -426,21 +457,7 @@ Event* EventChannel::Impl::wait(Time timeoutNs) assert(ed != 0); // TODO aconway 2006-12-20: DEBUG cout << endl << epoll(ee.events) << endl; - EventPair ready = ed->wake(ee.events); - - // We can only return one event so if both completed push one - // onto the dispatch queue to be dispatched in another thread. - if (ready.first && ready.second) { - // Keep it fair: in & out take turns to be returned first. - if (doSwap) - swap(ready.first, ready.second); - doSwap = !doSwap; - event = ready.first; - dispatchQueue->push(ready.second); - } - else { - event = ready.first ? ready.first : ready.second; - } + event = ed->wake(ee.events); } return event; } diff --git a/cpp/tests/run-system-tests b/cpp/tests/run-system-tests index 5aa0f9a409..d9b5431845 100755 --- a/cpp/tests/run-system-tests +++ b/cpp/tests/run-system-tests @@ -24,7 +24,7 @@ run_test() { } run_test ./client_test -run_test ./topictest -l2 -m2 -b1 +run_test ./topictest -s2 -m2 -b1 # Run the python tests. if test -d ../../python ; then diff --git a/cpp/tests/topictest b/cpp/tests/topictest index da3a0c1f92..d68a8c81da 100755 --- a/cpp/tests/topictest +++ b/cpp/tests/topictest @@ -1,5 +1,5 @@ #!/bin/bash -# Run the c++ or topic test +# Run the C++ topic test # Defaults SUBSCRIBERS=10 @@ -12,7 +12,7 @@ while getopts "s:m:b:" opt ; do m) MESSAGES=$OPTARG ;; b) BATCHES=$OPTARG ;; ?) - echo "Usage: %0 [-l <subscribers>] [-m <messages.] [-b <batches>]" + echo "Usage: %0 [-s <subscribers>] [-m <messages.] [-b <batches>]" exit 1 ;; esac |