summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-21 00:58:59 +0000
committerAlan Conway <aconway@apache.org>2006-12-21 00:58:59 +0000
commit27e78c73ab671a465c35f3cd405f9bcf3ca3dbec (patch)
tree2435c1586195824640adfe9656c93b64dc09a808
parent10039f3f81807ba4e9cbbcf2ae000f72888a56a7 (diff)
downloadqpid-python-27e78c73ab671a465c35f3cd405f9bcf3ca3dbec.tar.gz
EventChannel.cpp: Simplified handling of wakeups for multiple events by
using epoll's level-triggered behavior to "put back" an event and let it wake up later. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489237 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp67
-rwxr-xr-xcpp/tests/run-system-tests2
-rwxr-xr-xcpp/tests/topictest4
3 files changed, 45 insertions, 28 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
index 26437484f8..79bf8030cb 100644
--- a/cpp/lib/common/sys/posix/EventChannel.cpp
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -88,6 +88,10 @@ class EventChannel::Queue : private boost::noncopyable
*/
Event* wake(uint32_t epollFlags);
+ Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; }
+
+ bool empty() { return queue.empty(); }
+
void setBit(uint32_t &epollFlags);
void shutdown();
@@ -117,7 +121,7 @@ class EventChannel::Descriptor : private boost::noncopyable {
void activate(int epollFd_, int myFd_);
/** Epoll woke up for this descriptor. */
- EventPair wake(uint32_t epollEvents);
+ Event* wake(uint32_t epollEvents);
/** Shut down: close and remove file descriptor.
* May be re-activated if fd is reused.
@@ -134,11 +138,13 @@ class EventChannel::Descriptor : private boost::noncopyable {
private:
void update();
void epollCtl(int op, uint32_t events);
+ Queue* pick();
Mutex lock;
int epollFd;
int myFd;
Queue inQueue, outQueue;
+ bool preferIn;
friend class Queue;
};
@@ -205,6 +211,7 @@ void EventChannel::Queue::setBit(uint32_t &epollFlags) {
epollFlags |= myEvent;
}
+// TODO aconway 2006-12-20: REMOVE
Event* EventChannel::Queue::wake(uint32_t epollFlags) {
// Called with lock held.
if (!queue.empty() && (isMyEvent(epollFlags))) {
@@ -259,6 +266,7 @@ void EventChannel::Descriptor::shutdownUnsafe() {
outQueue.shutdown();
}
+// TODO aconway 2006-12-20: Inline into wake().
void EventChannel::Descriptor::update() {
// Caller holds lock.
if (isShutdown()) // Nothing to do
@@ -276,12 +284,26 @@ void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
ee.data.ptr = this;
ee.events = events;
int status = ::epoll_ctl(epollFd, op, myFd, &ee);
- if (status < 0)
- throw QPID_POSIX_ERROR(errno);
+ if (status < 0) {
+ if (errno == EBADF) // FD was closed externally.
+ shutdownUnsafe();
+ else
+ throw QPID_POSIX_ERROR(errno);
+ }
}
-EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
+EventChannel::Queue* EventChannel::Descriptor::pick() {
+ if (inQueue.empty() && outQueue.empty())
+ return 0;
+ if (inQueue.empty() || outQueue.empty())
+ return !inQueue.empty() ? &inQueue : &outQueue;
+ // Neither is empty, pick fairly.
+ preferIn = !preferIn;
+ return preferIn ? &inQueue : &outQueue;
+}
+
+Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
Mutex::ScopedLock l(lock);
// On error, shut down the Descriptor and both queues.
if (epollEvents & (EPOLLERR | EPOLLHUP)) {
@@ -291,14 +313,24 @@ EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
// exception on the events. Can we get more accurate error
// reporting somehow?
}
- // Check the queues even if shutdown - we want to drain the
- // events that have been marked with exceptions.
- EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents));
+ Queue*q = 0;
+ bool in = (epollEvents & EPOLLIN);
+ bool out = (epollEvents & EPOLLOUT);
+ if ((in && out) || isShutdown())
+ q = pick(); // Choose fairly, either non-empty queue.
+ else if (in)
+ q = &inQueue;
+ else if (out)
+ q = &outQueue;
+ Event* e = (q && !q->empty()) ? q->pop() : 0;
update();
- return ready;
+ if (e)
+ e->complete(*this);
+ return e;
}
+
// ================================================================
// EventChannel::Impl
@@ -405,11 +437,10 @@ Event* EventChannel::Impl::wait(Time timeoutNs)
(timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
CleanStruct<epoll_event> ee;
Event* event = 0;
- bool doSwap = true;
// Loop till we get a completed event. Some events may repost
// themselves and return 0, e.g. incomplete read or write events.
- //
+ //TODO aconway 2006-12-20: FIX THIS!
while (!event) {
int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
if (n == 0) // Timeout
@@ -426,21 +457,7 @@ Event* EventChannel::Impl::wait(Time timeoutNs)
assert(ed != 0);
// TODO aconway 2006-12-20: DEBUG
cout << endl << epoll(ee.events) << endl;
- EventPair ready = ed->wake(ee.events);
-
- // We can only return one event so if both completed push one
- // onto the dispatch queue to be dispatched in another thread.
- if (ready.first && ready.second) {
- // Keep it fair: in & out take turns to be returned first.
- if (doSwap)
- swap(ready.first, ready.second);
- doSwap = !doSwap;
- event = ready.first;
- dispatchQueue->push(ready.second);
- }
- else {
- event = ready.first ? ready.first : ready.second;
- }
+ event = ed->wake(ee.events);
}
return event;
}
diff --git a/cpp/tests/run-system-tests b/cpp/tests/run-system-tests
index 5aa0f9a409..d9b5431845 100755
--- a/cpp/tests/run-system-tests
+++ b/cpp/tests/run-system-tests
@@ -24,7 +24,7 @@ run_test() {
}
run_test ./client_test
-run_test ./topictest -l2 -m2 -b1
+run_test ./topictest -s2 -m2 -b1
# Run the python tests.
if test -d ../../python ; then
diff --git a/cpp/tests/topictest b/cpp/tests/topictest
index da3a0c1f92..d68a8c81da 100755
--- a/cpp/tests/topictest
+++ b/cpp/tests/topictest
@@ -1,5 +1,5 @@
#!/bin/bash
-# Run the c++ or topic test
+# Run the C++ topic test
# Defaults
SUBSCRIBERS=10
@@ -12,7 +12,7 @@ while getopts "s:m:b:" opt ; do
m) MESSAGES=$OPTARG ;;
b) BATCHES=$OPTARG ;;
?)
- echo "Usage: %0 [-l <subscribers>] [-m <messages.] [-b <batches>]"
+ echo "Usage: %0 [-s <subscribers>] [-m <messages.] [-b <batches>]"
exit 1
;;
esac