diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannel.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/EventChannel.cpp | 679 |
1 files changed, 475 insertions, 204 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp index 6db397a165..d35eedf5a5 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.cpp +++ b/cpp/src/qpid/sys/posix/EventChannel.cpp @@ -1,4 +1,4 @@ -/* +/* * * Copyright (c) 2006 The Apache Software Foundation * @@ -16,6 +16,19 @@ * */ +// TODO aconway 2006-12-15: Locking review. + +// TODO aconway 2006-12-15: use Descriptor pointers everywhere, +// get them from channel, pass them to Event constructors. +// Eliminate lookup. + + +#include "EventChannel.h" +#include "check.h" + +#include "qpid/QpidError.h" +#include "qpid/sys/AtomicCount.h" + #include <mqueue.h> #include <string.h> #include <iostream> @@ -29,139 +42,420 @@ #include <queue> #include <boost/ptr_container/ptr_map.hpp> -#include <boost/current_function.hpp> - -#include "qpid/QpidError.h" -#include "qpid/sys/Monitor.h" -#include "qpid/log/Statement.h" - -#include "check.h" -#include "EventChannel.h" +#include <boost/noncopyable.hpp> +#include <boost/bind.hpp> using namespace std; -// Convenience template to zero out a struct. -template <class S> struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - namespace qpid { namespace sys { +// ================================================================ +// Private class declarations + +namespace { + +typedef enum { IN, OUT } Direction; + +typedef std::pair<Event*, Event*> EventPair; + /** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor + * Template to zero out a C-struct on construction. Avoids uninitialized memory + * warnings from valgrind or other mem checking tool. + */ +template <class T> struct CleanStruct : public T { + CleanStruct() { memset(this, 0, sizeof(*this)); } +}; + +} // namespace + +/** + * Queue of events corresponding to one IO direction (IN or OUT). + * Each Descriptor contains two Queues. + */ +class EventChannel::Queue : private boost::noncopyable { public: - EventHandler(int epollSize = 256); - ~EventHandler(); + Queue(Descriptor& container, Direction dir); - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); + /** Called by Event classes in prepare() */ + void push(Event* e); - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); + /** Called when epoll wakes. + *@return The next completed event or 0. + */ + Event* wake(uint32_t epollFlags); + + Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; } + + bool empty() { return queue.empty(); } + + void setBit(uint32_t &epollFlags); + + void shutdown(); private: + typedef std::deque<Event*> EventQ; + + inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } + + Mutex& lock; // Shared with Descriptor. + Descriptor& descriptor; + uint32_t myEvent; // Epoll event flag. + EventQ queue; +}; + + +/** + * Manages a file descriptor in an epoll set. + * + * Can be shutdown and re-activated for the same file descriptor. + */ +class EventChannel::Descriptor : private boost::noncopyable { + public: + explicit Descriptor(int fd) : epollFd(-1), myFd(fd), + inQueue(*this, IN), outQueue(*this, OUT) {} + + void activate(int epollFd_); + + /** Epoll woke up for this descriptor. */ + Event* wake(uint32_t epollEvents); + + /** Shut down: close and remove file descriptor. + * May be re-activated if fd is reused. + */ + void shutdown(); + + // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. + void shutdownUnsafe(); + + bool isShutdown() { return epollFd == -1; } + + Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } + int getFD() const { return myFd; } + + private: + void update(); + void epollCtl(int op, uint32_t events); + Queue* pick(); + + Mutex lock; int epollFd; - std::string mqName; - int mqFd; - std::queue<Event*> mqEvents; + int myFd; + Queue inQueue, outQueue; + bool preferIn; + + friend class Queue; }; -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +/** + * Holds a map of Descriptors, which do most of the work. + */ +class EventChannel::Impl { + public: + Impl(int size = 256); + + ~Impl(); + + /** + * Activate descriptor + */ + void activate(Descriptor& d) { + d.activate(epollFd); + } + + /** Wait for an event, return 0 on timeout */ + Event* wait(Duration timeout); - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. + void shutdown(); + + private: + + Monitor monitor; + int epollFd; + int shutdownPipe[2]; + AtomicCount nWaiters; + bool isShutdown; +}; + + +// ================================================================ +// EventChannel::Queue::implementation. + +static const char* shutdownMsg = "Event queue shut down."; + +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : + lock(d.lock), descriptor(d), + myEvent(dir==IN ? EPOLLIN : EPOLLOUT) +{} + +void EventChannel::Queue::push(Event* e) { + Mutex::ScopedLock l(lock); + if (descriptor.isShutdown()) + THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); + queue.push_back(e); + descriptor.update(); +} + +void EventChannel::Queue::setBit(uint32_t &epollFlags) { + if (queue.empty()) + epollFlags &= ~myEvent; + else + epollFlags |= myEvent; +} + +// TODO aconway 2006-12-20: REMOVE +Event* EventChannel::Queue::wake(uint32_t epollFlags) { + // Called with lock held. + if (!queue.empty() && (isMyEvent(epollFlags))) { + assert(!queue.empty()); + Event* e = queue.front(); + assert(e); + if (!e->getException()) { + // TODO aconway 2006-12-20: Can/should we move event completion + // out into dispatch() so it doesn't happen in Descriptor locks? + e->complete(descriptor); + } + queue.pop_front(); + return e; + } + return 0; +} + +void EventChannel::Queue::shutdown() { + // Mark all pending events with a shutdown exception. + // The server threads will remove and dispatch the events. // - ZeroStruct<struct mq_attr> attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. + qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE); + for_each(queue.begin(), queue.end(), + boost::bind(&Event::setException, _1, ex)); +} - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); + +// ================================================================ +// Descriptor + + +void EventChannel::Descriptor::activate(int epollFd_) { + Mutex::ScopedLock l(lock); + if (isShutdown()) { + epollFd = epollFd_; // We're back in business. + epollCtl(EPOLL_CTL_ADD, 0); + } } -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); +void EventChannel::Descriptor::shutdown() { + Mutex::ScopedLock l(lock); + shutdownUnsafe(); } -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +void EventChannel::Descriptor::shutdownUnsafe() { + // Caller holds lock. + ::close(myFd); + epollFd = -1; // Mark myself as shutdown. + inQueue.shutdown(); + outQueue.shutdown(); +} + +// TODO aconway 2006-12-20: Inline into wake(). +void EventChannel::Descriptor::update() { + // Caller holds lock. + if (isShutdown()) // Nothing to do + return; + uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; + inQueue.setBit(events); + outQueue.setBit(events); + epollCtl(EPOLL_CTL_MOD, events); +} + +void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { + // Caller holds lock + assert(!isShutdown()); + CleanStruct<epoll_event> ee; + ee.data.ptr = this; + ee.events = events; + int status = ::epoll_ctl(epollFd, op, myFd, &ee); + if (status < 0) { + if (errno == EEXIST) // It's okay to add an existing fd + return; + else if (errno == EBADF) // FD was closed externally. + shutdownUnsafe(); + else + throw QPID_POSIX_ERROR(errno); + } } + -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) +EventChannel::Queue* EventChannel::Descriptor::pick() { + if (inQueue.empty() && outQueue.empty()) return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; + if (inQueue.empty() || outQueue.empty()) + return !inQueue.empty() ? &inQueue : &outQueue; + // Neither is empty, pick fairly. + preferIn = !preferIn; + return preferIn ? &inQueue : &outQueue; } -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); +Event* EventChannel::Descriptor::wake(uint32_t epollEvents) { + Mutex::ScopedLock l(lock); + // On error, shut down the Descriptor and both queues. + if (epollEvents & (EPOLLERR | EPOLLHUP)) { + shutdownUnsafe(); + // TODO aconway 2006-12-20: This error handling models means + // that any error reported by epoll will result in a shutdown + // exception on the events. Can we get more accurate error + // reporting somehow? + } + Queue*q = 0; + bool in = (epollEvents & EPOLLIN); + bool out = (epollEvents & EPOLLOUT); + if ((in && out) || isShutdown()) + q = pick(); // Choose fairly, either non-empty queue. + else if (in) + q = &inQueue; + else if (out) + q = &outQueue; + Event* e = (q && !q->empty()) ? q->pop() : 0; + update(); + if (e) + e->complete(*this); + return e; } -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) + + +// ================================================================ +// EventChannel::Impl + + +EventChannel::Impl::Impl(int epollSize): + epollFd(-1), isShutdown(false) { - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); + // Create the epoll file descriptor. + epollFd = epoll_create(epollSize); + QPID_POSIX_CHECK(epollFd); + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // We activate the FD when there are messages in the queue. + QPID_POSIX_CHECK(::pipe(shutdownPipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1)); } -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); +EventChannel::Impl::~Impl() { + shutdown(); + ::close(epollFd); + ::close(shutdownPipe[0]); + ::close(shutdownPipe[1]); } -Event* EventHandler::complete(EventHandler& eh) -{ - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); + +void EventChannel::Impl::shutdown() { + Monitor::ScopedLock l(monitor); + if (!isShutdown) { // I'm starting shutdown. + isShutdown = true; + if (nWaiters == 0) + return; + + // TODO aconway 2006-12-20: If I just close the epollFd will + // that wake all threads? If so with what? Would be simpler than: + + CleanStruct<epoll_event> ee; + ee.data.ptr = 0; + ee.events = EPOLLIN; + QPID_POSIX_CHECK( + epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee)); + } + // Wait for nWaiters to get out. + while (nWaiters > 0) { + monitor.wait(); + } +} + +// TODO aconway 2006-12-20: DEBUG remove +struct epoll { + epoll(uint32_t e) : events(e) { } + uint32_t events; +}; + +#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "") +ostream& operator << (ostream& out, epoll e) { + out << "epoll_event.events: "; + BIT(EPOLLIN); + BIT(EPOLLPRI); + BIT(EPOLLOUT); + BIT(EPOLLRDNORM); + BIT(EPOLLRDBAND); + BIT(EPOLLWRNORM); + BIT(EPOLLWRBAND); + BIT(EPOLLMSG); + BIT(EPOLLERR); + BIT(EPOLLHUP); + BIT(EPOLLONESHOT); + BIT(EPOLLET); + return out; } + + +/** + * Wait for epoll to wake up, return the descriptor or 0 on timeout. + */ +Event* EventChannel::Impl::wait(Duration timeoutNs) +{ + { + Monitor::ScopedLock l(monitor); + if (isShutdown) + throw ShutdownException(); + } + + // Increase nWaiters for the duration, notify the monitor if I'm + // the last one out. + // + AtomicCount::ScopedIncrement si( + nWaiters, boost::bind(&Monitor::notifyAll, &monitor)); + + // No lock, all thread safe calls or local variables: + // + const long timeoutMs = + (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; + CleanStruct<epoll_event> ee; + Event* event = 0; + + // Loop till we get a completed event. Some events may repost + // themselves and return 0, e.g. incomplete read or write events. + //TODO aconway 2006-12-20: FIX THIS! + while (!event) { + int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. + if (n == 0) // Timeout + return 0; + if (n < 0 && errno == EINTR) // Interrupt, ignore it. + continue; + if (n < 0) + throw QPID_POSIX_ERROR(errno); + assert(n == 1); + Descriptor* ed = + reinterpret_cast<Descriptor*>(ee.data.ptr); + if (ed == 0) // We're being shut-down. + throw ShutdownException(); + assert(ed != 0); + event = ed->wake(ee.events); + } + return event; +} + +//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { +// Mutex::ScopedLock l(monitor); +// Descriptor& ed = descriptors[fd]; +// ed.activate(epollFd, fd); +// return ed; +//} + + // ================================================================ // EventChannel @@ -169,157 +463,134 @@ EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } -EventChannel::EventChannel() : handler(new EventHandler()) {} +EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} EventChannel::~EventChannel() {} -void EventChannel::postEvent(Event& e) +void EventChannel::post(Event& e) { - e.prepare(*handler); + e.prepare(*impl); } -Event* EventChannel::getEvent() +Event* EventChannel::wait(Duration timeoutNs) { - static const int infiniteTimeout = -1; - ZeroStruct<struct epoll_event> epollEvent; + return impl->wait(timeoutNs); +} - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - QPID_LOG(warn, "Ignoring error: " - << PosixError::getMessage(errno)); - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast<Event*>(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; +void EventChannel::shutdown() { + impl->shutdown(); } + +// ================================================================ +// Event and subclasses. + Event::~Event() {} -void Event::prepare(EventHandler& handler) -{ - handler.mqPut(this); +Exception::shared_ptr_const Event::getException() const { + return exception; } -bool Event::hasError() const { - return error; +void Event::throwIfException() { + if (getException()) + exception->throwSelf(); } -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); -} - -Event* Event::complete(EventHandler&) +void Event::dispatch() { - return this; + if (!callback.empty()) + callback(); } -void Event::dispatch() -{ +void Event::setException(const std::exception& e) { + const Exception* ex = dynamic_cast<const Exception*>(&e); + if (ex) + exception.reset(ex->clone().release()); + else + exception.reset(new Exception(e)); +#ifndef NDEBUG + // Throw and re-catch the exception. Has no effect on the + // program but it triggers debuggers watching for throw. The + // context that sets the exception is more informative for + // debugging purposes than the one that ultimately throws it. + // try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + throwIfException(); } + catch (...) { } // Ignored. +#endif } -void Event::setError(const ExceptionHolder& e) { - error = e; +int FDEvent::getFDescriptor() const { + return descriptor.getFD(); } -void ReadEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak +ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) { } -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, - size - received); - if (n > 0) received += n; - return n; +void ReadEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* ReadEvent::complete(EventHandler& handler) +void ReadEvent::complete(EventChannel::Descriptor& ed) { - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); - return 0; - } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + ssize_t n = ::read(getFDescriptor(), + static_cast<char*>(buffer) + bytesRead, + size - bytesRead); + if (n > 0) + bytesRead += n; + if (n == 0 || (n < 0 && errno != EAGAIN)) { + // Use ENODATA for file closed. + setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno)); + ed.shutdownUnsafe(); } } -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) { +} + +void WriteEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(OUT).push(this); } -Event* WriteEvent::complete(EventHandler& handler) + +void WriteEvent::complete(EventChannel::Descriptor& ed) { - ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; + ssize_t n = ::write(getFDescriptor(), + static_cast<const char*>(buffer) + bytesWritten, + size - bytesWritten); + if (n > 0) + bytesWritten += n; + if(n < 0 && errno != EAGAIN) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); - return this; } -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +AcceptEvent::AcceptEvent(int fd, Callback cb) : + FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) { +} + +void AcceptEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* AcceptEvent::complete(EventHandler& handler) +void AcceptEvent::complete(EventChannel::Descriptor& ed) { - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); - return this; + accepted = ::accept(getFDescriptor(), 0, 0); + if (accepted < 0) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. + } } }} |