summaryrefslogtreecommitdiff
path: root/cpp/lib/common/sys/posix/EventChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/common/sys/posix/EventChannel.cpp')
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp572
1 files changed, 376 insertions, 196 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
index 16c7ec9c3f..860ecd6b07 100644
--- a/cpp/lib/common/sys/posix/EventChannel.cpp
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -1,4 +1,4 @@
-/*
+/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
@@ -16,6 +16,13 @@
*
*/
+// TODO aconway 2006-12-15: Locking review.
+
+// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
+// get them from channel, pass them to Event constructors.
+// Eliminate lookup.
+
+
#include <mqueue.h>
#include <string.h>
#include <iostream>
@@ -29,10 +36,10 @@
#include <queue>
#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/bind.hpp>
#include <QpidError.h>
-#include <sys/Monitor.h>
#include "check.h"
#include "EventChannel.h"
@@ -40,127 +47,319 @@
using namespace std;
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
- ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-
namespace qpid {
namespace sys {
+// ================================================================
+// Private class declarations
+
+namespace {
+
+typedef enum { IN, OUT } Direction;
+typedef std::pair<Event*, Event*> EventPair;
+} // namespace
+
/**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */
-class EventHandler : public Event, private Monitor
+ * Queue of events corresponding to one IO direction (IN or OUT).
+ * Each Descriptor contains two Queues.
+ */
+class EventChannel::Queue : private boost::noncopyable
{
public:
- EventHandler(int epollSize = 256);
- ~EventHandler();
+ Queue(Descriptor& container, Direction dir);
- int getEpollFd() { return epollFd; }
- void epollAdd(int fd, uint32_t epollEvents, Event* event);
- void epollMod(int fd, uint32_t epollEvents, Event* event);
- void epollDel(int fd);
+ /** Called by Event classes in prepare() */
+ void push(Event* e);
- void mqPut(Event* event);
- Event* mqGet();
-
- protected:
- // Should never be called, only complete.
- void prepare(EventHandler&) { assert(0); }
- Event* complete(EventHandler& eh);
+ /** Called when epoll wakes.
+ *@return The next completed event or 0.
+ */
+ Event* wake(uint32_t epollFlags);
+
+ void setBit(uint32_t &epollFlags);
+
+ void shutdown();
private:
+ typedef std::deque<Event*> EventQ;
+
+ inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
+
+ Mutex& lock; // Shared with Descriptor.
+ Descriptor& descriptor;
+ uint32_t myEvent; // Epoll event flag.
+ EventQ queue;
+};
+
+
+/**
+ * Manages a file descriptor in an epoll set.
+ *
+ * Can be shutdown and re-activated for the same file descriptor.
+ */
+class EventChannel::Descriptor : private boost::noncopyable {
+ public:
+ Descriptor() : epollFd(-1), myFd(-1),
+ inQueue(*this, IN), outQueue(*this, OUT) {}
+
+ void activate(int epollFd_, int myFd_);
+
+ /** Epoll woke up for this descriptor. */
+ EventPair wake(uint32_t epollEvents);
+
+ /** Shut down: close and remove file descriptor.
+ * May be re-activated if fd is reused.
+ */
+ void shutdown();
+
+ // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
+ void shutdownUnsafe();
+
+ bool isShutdown() { return epollFd == -1; }
+
+ Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
+
+ private:
+ void update();
+ void epollCtl(int op, uint32_t events);
+
+ Mutex lock;
int epollFd;
- std::string mqName;
- int mqFd;
- std::queue<Event*> mqEvents;
+ int myFd;
+ Queue inQueue, outQueue;
+
+ friend class Queue;
};
-EventHandler::EventHandler(int epollSize)
-{
- epollFd = epoll_create(epollSize);
- if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+/**
+ * Holds the epoll fd, Descriptor map and dispatch queue.
+ * Most of the epoll work is done by the Descriptors.
+ */
+class EventChannel::Impl {
+ public:
+ Impl(int size = 256);
- // Create a POSIX message queue for non-fd events.
- // We write one byte and never read it is always ready for read
- // when we add it to epoll.
- //
- ZeroStruct<struct mq_attr> attr;
- attr.mq_maxmsg = 1;
- attr.mq_msgsize = 1;
- do {
- char tmpnam[L_tmpnam];
- tmpnam_r(tmpnam);
- mqName = tmpnam + 4; // Skip "tmp/"
- mqFd = mq_open(
- mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
- if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
- } while (mqFd == EEXIST); // Name already taken, try again.
+ ~Impl();
- static char zero = '\0';
- mq_send(mqFd, &zero, 1, 0);
- epollAdd(mqFd, 0, this);
+ /**
+ * Registers fd if not already registered.
+ */
+ Descriptor& getDescriptor(int fd);
+
+ /** Wait for an event, return 0 on timeout */
+ Event* wait(Time timeout);
+
+ Queue& getDispatchQueue() { return *dispatchQueue; }
+
+ private:
+
+ typedef boost::ptr_map<int, Descriptor> DescriptorMap;
+
+ Mutex lock;
+ int epollFd;
+ DescriptorMap descriptors;
+ int pipe[2];
+ Queue* dispatchQueue;
+};
+
+
+
+// ================================================================
+// EventChannel::Queue::implementation.
+
+static const char* shutdownMsg = "Event queue shut down.";
+
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d),
+ myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
+{}
+
+void EventChannel::Queue::push(Event* e) {
+ Mutex::ScopedLock l(lock);
+ if (descriptor.isShutdown())
+ THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
+ queue.push_back(e);
+ descriptor.update();
}
-EventHandler::~EventHandler() {
- mq_close(mqFd);
- mq_unlink(mqName.c_str());
+void EventChannel::Queue::setBit(uint32_t &epollFlags) {
+ if (queue.empty())
+ epollFlags &= ~myEvent;
+ else
+ epollFlags |= myEvent;
}
-void EventHandler::mqPut(Event* event) {
- ScopedLock l(*this);
- assert(event != 0);
- mqEvents.push(event);
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+Event* EventChannel::Queue::wake(uint32_t epollFlags) {
+ // Called with lock held.
+ if (!queue.empty() && (isMyEvent(epollFlags))) {
+ Event* e = queue.front()->complete(descriptor);
+ if (e) {
+ queue.pop_front();
+ return e;
+ }
+ }
+ return 0;
+}
+
+void EventChannel::Queue::shutdown() {
+ // Mark all pending events with a shutdown exception.
+ // The server threads will remove and dispatch the events.
+ //
+ qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, QPID_ERROR_LOCATION);
+ for_each(queue.begin(), queue.end(),
+ boost::bind(&Event::setException, _1, ex));
}
-Event* EventHandler::mqGet() {
- ScopedLock l(*this);
- if (mqEvents.empty())
- return 0;
- Event* event = mqEvents.front();
- mqEvents.pop();
- if(!mqEvents.empty())
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
- return event;
+
+// ================================================================
+// Descriptor
+
+
+void EventChannel::Descriptor::activate(int epollFd_, int myFd_) {
+ Mutex::ScopedLock l(lock);
+ assert(myFd < 0 || (myFd == myFd_)); // Can't change fd.
+ if (epollFd < 0) { // Means we're not polling.
+ epollFd = epollFd_;
+ myFd = myFd_;
+ epollCtl(EPOLL_CTL_ADD, 0);
+ }
}
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
+void EventChannel::Descriptor::shutdown() {
+ Mutex::ScopedLock l(lock);
+ shutdownUnsafe();
+}
+
+void EventChannel::Descriptor::shutdownUnsafe() {
+ // Caller holds lock.
+ ::close(myFd);
+ epollFd = -1; // Indicate we are not polling.
+ inQueue.shutdown();
+ outQueue.shutdown();
+ epollCtl(EPOLL_CTL_DEL, 0);
+}
+
+void EventChannel::Descriptor::update() {
+ // Caller holds lock.
+ uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP;
+ inQueue.setBit(events);
+ outQueue.setBit(events);
+ epollCtl(EPOLL_CTL_MOD, events);
+}
+
+void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
+ // Caller holds lock
+ assert(!isShutdown());
+ struct epoll_event ee;
+ memset(&ee, 0, sizeof(ee));
+ ee.data.ptr = this;
+ ee.events = events;
+ int status = ::epoll_ctl(epollFd, op, myFd, &ee);
+ if (status < 0)
throw QPID_POSIX_ERROR(errno);
+ }
+}
+
+
+EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
+ Mutex::ScopedLock l(lock);
+ cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl;
+ // If we have an error:
+ if (epollEvents & (EPOLLERR | EPOLLHUP)) {
+ shutdownUnsafe();
+ // Complete both sides on error so the event can fail and
+ // mark itself with an exception.
+ epollEvents |= EPOLLIN | EPOLLOUT;
+ }
+ EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents));
+ update();
+ return ready;
}
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+
+// ================================================================
+// EventChannel::Impl
+
+
+EventChannel::Impl::Impl(int epollSize):
+ epollFd(-1), dispatchQueue(0)
{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
+ // Create the epoll file descriptor.
+ epollFd = epoll_create(epollSize);
+ QPID_POSIX_CHECK(epollFd);
+
+ // Create a pipe and write a single byte. The byte is never
+ // read so the pipes read fd is always ready for read.
+ // We activate the FD when there are messages in the queue.
+ QPID_POSIX_CHECK(::pipe(pipe));
+ static char zero = '\0';
+ QPID_POSIX_CHECK(::write(pipe[1], &zero, 1));
+ dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN);
}
-void EventHandler::epollDel(int fd) {
- if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
- throw QPID_POSIX_ERROR(errno);
+EventChannel::Impl::~Impl() {
+ close(epollFd);
+ close(pipe[0]);
+ close(pipe[1]);
}
-Event* EventHandler::complete(EventHandler& eh)
+
+/**
+ * Wait for epoll to wake up, return the descriptor or 0 on timeout.
+ */
+Event* EventChannel::Impl::wait(Time timeoutNs)
{
- assert(&eh == this);
- Event* event = mqGet();
- return event==0 ? 0 : event->complete(eh);
+ // No lock, all thread safe calls or local variables:
+ //
+ const long timeoutMs =
+ (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
+ struct epoll_event ee;
+ Event* event = 0;
+ bool doSwap = true;
+
+ // Loop till we get a completed event. Some events may repost
+ // themselves and return 0, e.g. incomplete read or write events.
+ //
+ while (!event) {
+ int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
+ if (n == 0) // Timeout
+ return 0;
+ if (n < 0 && errno != EINTR) // Interrupt, ignore it.
+ continue;
+ if (n < 0)
+ throw QPID_POSIX_ERROR(errno);
+ assert(n == 1);
+ Descriptor* ed =
+ reinterpret_cast<Descriptor*>(ee.data.ptr);
+ assert(ed);
+ EventPair ready = ed->wake(ee.events);
+
+ // We can only return one event so if both completed push one
+ // onto the dispatch queue to be dispatched in another thread.
+ if (ready.first && ready.second) {
+ // Keep it fair: in & out take turns to be returned first.
+ if (doSwap)
+ swap(ready.first, ready.second);
+ doSwap = !doSwap;
+ event = ready.first;
+ dispatchQueue->push(ready.second);
+ }
+ else {
+ event = ready.first ? ready.first : ready.second;
+ }
+ }
+ return event;
}
-
+
+EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
+ Mutex::ScopedLock l(lock);
+ Descriptor& ed = descriptors[fd];
+ ed.activate(epollFd, fd);
+ return ed;
+}
+
+
// ================================================================
// EventChannel
@@ -168,157 +367,138 @@ EventChannel::shared_ptr EventChannel::create() {
return shared_ptr(new EventChannel());
}
-EventChannel::EventChannel() : handler(new EventHandler()) {}
+EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
EventChannel::~EventChannel() {}
-void EventChannel::postEvent(Event& e)
+void EventChannel::post(Event& e)
{
- e.prepare(*handler);
+ e.prepare(*impl);
}
-Event* EventChannel::getEvent()
-{
- static const int infiniteTimeout = -1;
- ZeroStruct<struct epoll_event> epollEvent;
-
- // Loop until we can complete the event. Some events may re-post
- // themselves and return 0 from complete, e.g. partial reads. //
- Event* event = 0;
- while (event == 0) {
- int eventCount = epoll_wait(handler->getEpollFd(),
- &epollEvent, 1, infiniteTimeout);
- if (eventCount < 0) {
- if (errno != EINTR) {
- // TODO aconway 2006-11-28: Proper handling/logging of errors.
- cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
- << PosixError::getMessage(errno) << endl;
- assert(0);
- }
- }
- else if (eventCount == 1) {
- event = reinterpret_cast<Event*>(epollEvent.data.ptr);
- assert(event != 0);
- try {
- event = event->complete(*handler);
- }
- catch (const Exception& e) {
- if (event)
- event->setError(e);
- }
- catch (const std::exception& e) {
- if (event)
- event->setError(e);
- }
- }
- }
- return event;
+void EventChannel::post(Event* e) {
+ assert(e);
+ post(*e);
}
-Event::~Event() {}
-
-void Event::prepare(EventHandler& handler)
+Event* EventChannel::wait(Time timeoutNs)
{
- handler.mqPut(this);
+ return impl->wait(timeoutNs);
}
-bool Event::hasError() const {
- return error;
-}
-void Event::throwIfError() throw (Exception) {
- if (hasError())
- error.throwSelf();
+// ================================================================
+// Event and subclasses.
+
+Event::~Event() {}
+
+Exception::shared_ptr_const Event::getException() const {
+ return exception;
}
-Event* Event::complete(EventHandler&)
-{
- return this;
+void Event::throwIfException() {
+ if (getException())
+ exception->throwSelf();
}
void Event::dispatch()
{
+ if (!callback.empty())
+ callback();
+}
+
+void Event::setException(const std::exception& e) {
+ const Exception* ex = dynamic_cast<const Exception*>(&e);
+ if (ex)
+ exception.reset(ex->clone().release());
+ else
+ exception.reset(new Exception(e));
+#ifndef NDEBUG
+ // Throw and re-catch the exception. Has no effect on the
+ // program but it triggers debuggers watching for throw. The
+ // context that sets the exception is more informative for
+ // debugging purposes than the one that ultimately throws it.
+ //
try {
- if (!callback.empty())
- callback();
- } catch (const std::exception&) {
- throw;
- } catch (...) {
- throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ throwIfException();
}
+ catch (...) { } // Ignored.
+#endif
}
-void Event::setError(const ExceptionHolder& e) {
- error = e;
+
+void ReadEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-void ReadEvent::prepare(EventHandler& handler)
+Event* ReadEvent::complete(EventChannel::Descriptor& ed)
{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ ssize_t n = ::read(descriptor,
+ static_cast<char*>(buffer) + bytesRead,
+ size - bytesRead);
+
+ if (n < 0 && errno != EAGAIN) { // Error
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
+ }
+ else if (n == 0) { // End of file
+ // TODO aconway 2006-12-13: Don't treat EOF as exception
+ // unless we're partway thru a !noWait read.
+ setException(QPID_POSIX_ERROR(ENODATA));
+ ed.shutdownUnsafe(); // Called with lock held.
+ }
+ else {
+ if (n > 0) // possible that n < 0 && errno == EAGAIN
+ bytesRead += n;
+ if (bytesRead < size && !noWait) {
+ // Continue reading, not enough data.
+ return 0;
+ }
+ }
+ return this;
}
-ssize_t ReadEvent::doRead() {
- ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
- size - received);
- if (n > 0) received += n;
- return n;
+
+void WriteEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDescriptor(descriptor).getQueue(OUT).push(this);
}
-Event* ReadEvent::complete(EventHandler& handler)
+
+Event* WriteEvent::complete(EventChannel::Descriptor& ed)
{
- // Read as much as possible without blocking.
- ssize_t n = doRead();
- while (n > 0 && received < size) doRead();
-
- if (received == size) {
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- return this;
- }
- else if (n <0 && (errno == EAGAIN)) {
- // Keep polling for more.
- handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ ssize_t n = ::write(descriptor,
+ static_cast<const char*>(buffer) + bytesWritten,
+ size - bytesWritten);
+ if(n < 0 && errno == EAGAIN && noWait) {
return 0;
}
- else {
- // Unexpected EOF or error. Throw ENODATA for EOF.
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ if (n < 0 || (bytesWritten += n) < size) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
}
+ return this;
}
-void WriteEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+void AcceptEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-Event* WriteEvent::complete(EventHandler& handler)
+Event* AcceptEvent::complete(EventChannel::Descriptor& ed)
{
- ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
- size - written);
- if (n < 0) throw QPID_POSIX_ERROR(errno);
- written += n;
- if(written < size) {
- // Keep polling.
- handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
- return 0;
+ accepted = ::accept(descriptor, 0, 0);
+ if (accepted < 0) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
}
- written = 0; // Reset for re-use.
- handler.epollDel(descriptor);
return this;
}
-void AcceptEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+void DispatchEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDispatchQueue().push(this);
}
-Event* AcceptEvent::complete(EventHandler& handler)
+Event* DispatchEvent::complete(EventChannel::Descriptor&)
{
- handler.epollDel(descriptor);
- accepted = ::accept(descriptor, 0, 0);
- if (accepted < 0) throw QPID_POSIX_ERROR(errno);
return this;
}