summaryrefslogtreecommitdiff
path: root/cpp/lib/common/sys/posix/EventChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/common/sys/posix/EventChannel.cpp')
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp176
1 files changed, 124 insertions, 52 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
index ee6b6bbe2b..37e3c2257a 100644
--- a/cpp/lib/common/sys/posix/EventChannel.cpp
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -43,6 +43,7 @@
#include "check.h"
#include "EventChannel.h"
+#include "sys/AtomicCount.h"
using namespace std;
@@ -57,7 +58,17 @@ namespace sys {
namespace {
typedef enum { IN, OUT } Direction;
+
typedef std::pair<Event*, Event*> EventPair;
+
+/**
+ * Template to zero out a C-struct on construction. Avoids uninitialized memory
+ * warnings from valgrind or other mem checking tool.
+ */
+template <class T> struct CleanStruct : public T {
+ CleanStruct() { memset(this, 0, sizeof(*this)); }
+};
+
} // namespace
/**
@@ -153,25 +164,29 @@ class EventChannel::Impl {
Queue& getDispatchQueue() { return *dispatchQueue; }
+ void shutdown();
+
private:
typedef boost::ptr_map<int, Descriptor> DescriptorMap;
- Mutex lock;
+ Monitor monitor;
int epollFd;
DescriptorMap descriptors;
int pipe[2];
+ AtomicCount nWaiters;
Queue* dispatchQueue;
+ bool isShutdown;
};
-
// ================================================================
// EventChannel::Queue::implementation.
static const char* shutdownMsg = "Event queue shut down.";
-EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d),
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) :
+ lock(d.lock), descriptor(d),
myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
{}
@@ -193,11 +208,14 @@ void EventChannel::Queue::setBit(uint32_t &epollFlags) {
Event* EventChannel::Queue::wake(uint32_t epollFlags) {
// Called with lock held.
if (!queue.empty() && (isMyEvent(epollFlags))) {
- Event* e = queue.front()->complete(descriptor);
- if (e) {
- queue.pop_front();
- return e;
- }
+ assert(!queue.empty());
+ Event* e = queue.front();
+ assert(e);
+ // TODO aconway 2006-12-20: Can/should we move event completion
+ // out into dispatch() so it doesn't happen in Descriptor locks?
+ e->complete(descriptor);
+ queue.pop_front();
+ return e;
}
return 0;
}
@@ -250,9 +268,7 @@ void EventChannel::Descriptor::update() {
void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
// Caller holds lock
- assert(!isShutdown());
- struct epoll_event ee;
- memset(&ee, 0, sizeof(ee));
+ CleanStruct<epoll_event> ee;
ee.data.ptr = this;
ee.events = events;
int status = ::epoll_ctl(epollFd, op, myFd, &ee);
@@ -263,7 +279,6 @@ void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
Mutex::ScopedLock l(lock);
- cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl;
// If we have an error:
if (epollEvents & (EPOLLERR | EPOLLHUP)) {
shutdownUnsafe();
@@ -282,7 +297,7 @@ EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
EventChannel::Impl::Impl(int epollSize):
- epollFd(-1), dispatchQueue(0)
+ epollFd(-1), dispatchQueue(0), isShutdown(false)
{
// Create the epoll file descriptor.
epollFd = epoll_create(epollSize);
@@ -298,22 +313,90 @@ EventChannel::Impl::Impl(int epollSize):
}
EventChannel::Impl::~Impl() {
- close(epollFd);
- close(pipe[0]);
- close(pipe[1]);
+ shutdown();
+ ::close(epollFd);
+ ::close(pipe[0]);
+ ::close(pipe[1]);
}
+void EventChannel::Impl::shutdown() {
+ Monitor::ScopedLock l(monitor);
+ if (!isShutdown) { // I'm starting shutdown.
+ isShutdown = true;
+ if (nWaiters == 0)
+ return;
+
+ // TODO aconway 2006-12-20: If I just close the epollFd will
+ // that wake all threads? If so with what? Would be simpler than:
+
+ // Create a pipe and write a single byte. The byte is never
+ // read so the pipes read fd is always ready for read.
+ // Since we use level-triggered epoll this will wake up all
+ // wait() threads.
+ //
+ QPID_POSIX_CHECK(::pipe(pipe));
+ static char zero = '\0';
+ QPID_POSIX_CHECK(::write(pipe[1], &zero, 1));
+ CleanStruct<epoll_event> ee;
+ ee.data.ptr = 0;
+ ee.events = EPOLLIN;
+ QPID_POSIX_CHECK(epoll_ctl(epollFd, EPOLL_CTL_ADD, pipe[0], &ee));
+ }
+ // Wait for nWaiters to get out.
+ while (nWaiters > 0) {
+ monitor.wait();
+ }
+}
+
+// TODO aconway 2006-12-20: DEBUG remove
+struct epoll {
+ epoll(uint32_t e) : events(e) { }
+ uint32_t events;
+};
+
+#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "")
+ostream& operator << (ostream& out, epoll e) {
+ out << "epoll_event.events: ";
+ BIT(EPOLLIN);
+ BIT(EPOLLPRI);
+ BIT(EPOLLOUT);
+ BIT(EPOLLRDNORM);
+ BIT(EPOLLRDBAND);
+ BIT(EPOLLWRNORM);
+ BIT(EPOLLWRBAND);
+ BIT(EPOLLMSG);
+ BIT(EPOLLERR);
+ BIT(EPOLLHUP);
+ BIT(EPOLLONESHOT);
+ BIT(EPOLLET);
+ return out;
+}
+
+
+
/**
* Wait for epoll to wake up, return the descriptor or 0 on timeout.
*/
Event* EventChannel::Impl::wait(Time timeoutNs)
{
+ {
+ Monitor::ScopedLock l(monitor);
+ if (isShutdown)
+ throw ShutdownException();
+ }
+
+ // Increase nWaiters for the duration, notify the monitor if I'm
+ // the last one out.
+ //
+ AtomicCount::ScopedIncrement si(
+ nWaiters, boost::bind(&Monitor::notifyAll, &monitor));
+
// No lock, all thread safe calls or local variables:
//
const long timeoutMs =
(timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
- struct epoll_event ee;
+ CleanStruct<epoll_event> ee;
Event* event = 0;
bool doSwap = true;
@@ -324,14 +407,18 @@ Event* EventChannel::Impl::wait(Time timeoutNs)
int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
if (n == 0) // Timeout
return 0;
- if (n < 0 && errno != EINTR) // Interrupt, ignore it.
+ if (n < 0 && errno == EINTR) // Interrupt, ignore it.
continue;
if (n < 0)
throw QPID_POSIX_ERROR(errno);
assert(n == 1);
Descriptor* ed =
reinterpret_cast<Descriptor*>(ee.data.ptr);
- assert(ed);
+ if (ed == 0) // We're being shut-down.
+ throw ShutdownException();
+ assert(ed != 0);
+ // TODO aconway 2006-12-20: DEBUG
+ cout << endl << epoll(ee.events) << endl;
EventPair ready = ed->wake(ee.events);
// We can only return one event so if both completed push one
@@ -352,7 +439,7 @@ Event* EventChannel::Impl::wait(Time timeoutNs)
}
EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(monitor);
Descriptor& ed = descriptors[fd];
ed.activate(epollFd, fd);
return ed;
@@ -385,6 +472,10 @@ Event* EventChannel::wait(Time timeoutNs)
return impl->wait(timeoutNs);
}
+void EventChannel::shutdown() {
+ impl->shutdown();
+}
+
// ================================================================
// Event and subclasses.
@@ -430,31 +521,18 @@ void ReadEvent::prepare(EventChannel::Impl& impl) {
impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-Event* ReadEvent::complete(EventChannel::Descriptor& ed)
+void ReadEvent::complete(EventChannel::Descriptor& ed)
{
ssize_t n = ::read(descriptor,
static_cast<char*>(buffer) + bytesRead,
size - bytesRead);
-
- if (n < 0 && errno != EAGAIN) { // Error
- setException(QPID_POSIX_ERROR(errno));
- ed.shutdownUnsafe(); // Called with lock held.
- }
- else if (n == 0) { // End of file
- // TODO aconway 2006-12-13: Don't treat EOF as exception
- // unless we're partway thru a !noWait read.
- setException(QPID_POSIX_ERROR(ENODATA));
- ed.shutdownUnsafe(); // Called with lock held.
+ if (n > 0)
+ bytesRead += n;
+ if (n == 0 || (n < 0 && errno != EAGAIN)) {
+ // Use ENODATA for file closed.
+ setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno));
+ ed.shutdownUnsafe();
}
- else {
- if (n > 0) // possible that n < 0 && errno == EAGAIN
- bytesRead += n;
- if (bytesRead < size && !noWait) {
- // Continue reading, not enough data.
- return 0;
- }
- }
- return this;
}
@@ -463,42 +541,36 @@ void WriteEvent::prepare(EventChannel::Impl& impl) {
}
-Event* WriteEvent::complete(EventChannel::Descriptor& ed)
+void WriteEvent::complete(EventChannel::Descriptor& ed)
{
ssize_t n = ::write(descriptor,
static_cast<const char*>(buffer) + bytesWritten,
size - bytesWritten);
- if(n < 0 && errno == EAGAIN && noWait) {
- return 0;
- }
- if (n < 0 || (bytesWritten += n) < size) {
+ if (n > 0)
+ bytesWritten += n;
+ if(n < 0 && errno != EAGAIN) {
setException(QPID_POSIX_ERROR(errno));
ed.shutdownUnsafe(); // Called with lock held.
}
- return this;
}
void AcceptEvent::prepare(EventChannel::Impl& impl) {
impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-Event* AcceptEvent::complete(EventChannel::Descriptor& ed)
+void AcceptEvent::complete(EventChannel::Descriptor& ed)
{
accepted = ::accept(descriptor, 0, 0);
if (accepted < 0) {
setException(QPID_POSIX_ERROR(errno));
ed.shutdownUnsafe(); // Called with lock held.
}
- return this;
}
void DispatchEvent::prepare(EventChannel::Impl& impl) {
impl.getDispatchQueue().push(this);
}
-Event* DispatchEvent::complete(EventChannel::Descriptor&)
-{
- return this;
-}
+void DispatchEvent::complete(EventChannel::Descriptor&) {}
}}