diff options
Diffstat (limited to 'cpp/lib/common/sys/posix/EventChannel.cpp')
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannel.cpp | 572 |
1 files changed, 376 insertions, 196 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp index 16c7ec9c3f..860ecd6b07 100644 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -1,4 +1,4 @@ -/* +/* * * Copyright (c) 2006 The Apache Software Foundation * @@ -16,6 +16,13 @@ * */ +// TODO aconway 2006-12-15: Locking review. + +// TODO aconway 2006-12-15: use Descriptor pointers everywhere, +// get them from channel, pass them to Event constructors. +// Eliminate lookup. + + #include <mqueue.h> #include <string.h> #include <iostream> @@ -29,10 +36,10 @@ #include <queue> #include <boost/ptr_container/ptr_map.hpp> -#include <boost/current_function.hpp> +#include <boost/noncopyable.hpp> +#include <boost/bind.hpp> #include <QpidError.h> -#include <sys/Monitor.h> #include "check.h" #include "EventChannel.h" @@ -40,127 +47,319 @@ using namespace std; -// Convenience template to zero out a struct. -template <class S> struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - namespace qpid { namespace sys { +// ================================================================ +// Private class declarations + +namespace { + +typedef enum { IN, OUT } Direction; +typedef std::pair<Event*, Event*> EventPair; +} // namespace + /** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor + * Queue of events corresponding to one IO direction (IN or OUT). + * Each Descriptor contains two Queues. + */ +class EventChannel::Queue : private boost::noncopyable { public: - EventHandler(int epollSize = 256); - ~EventHandler(); + Queue(Descriptor& container, Direction dir); - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); + /** Called by Event classes in prepare() */ + void push(Event* e); - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); + /** Called when epoll wakes. + *@return The next completed event or 0. + */ + Event* wake(uint32_t epollFlags); + + void setBit(uint32_t &epollFlags); + + void shutdown(); private: + typedef std::deque<Event*> EventQ; + + inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } + + Mutex& lock; // Shared with Descriptor. + Descriptor& descriptor; + uint32_t myEvent; // Epoll event flag. + EventQ queue; +}; + + +/** + * Manages a file descriptor in an epoll set. + * + * Can be shutdown and re-activated for the same file descriptor. + */ +class EventChannel::Descriptor : private boost::noncopyable { + public: + Descriptor() : epollFd(-1), myFd(-1), + inQueue(*this, IN), outQueue(*this, OUT) {} + + void activate(int epollFd_, int myFd_); + + /** Epoll woke up for this descriptor. */ + EventPair wake(uint32_t epollEvents); + + /** Shut down: close and remove file descriptor. + * May be re-activated if fd is reused. + */ + void shutdown(); + + // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. + void shutdownUnsafe(); + + bool isShutdown() { return epollFd == -1; } + + Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } + + private: + void update(); + void epollCtl(int op, uint32_t events); + + Mutex lock; int epollFd; - std::string mqName; - int mqFd; - std::queue<Event*> mqEvents; + int myFd; + Queue inQueue, outQueue; + + friend class Queue; }; -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +/** + * Holds the epoll fd, Descriptor map and dispatch queue. + * Most of the epoll work is done by the Descriptors. + */ +class EventChannel::Impl { + public: + Impl(int size = 256); - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. - // - ZeroStruct<struct mq_attr> attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. + ~Impl(); - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); + /** + * Registers fd if not already registered. + */ + Descriptor& getDescriptor(int fd); + + /** Wait for an event, return 0 on timeout */ + Event* wait(Time timeout); + + Queue& getDispatchQueue() { return *dispatchQueue; } + + private: + + typedef boost::ptr_map<int, Descriptor> DescriptorMap; + + Mutex lock; + int epollFd; + DescriptorMap descriptors; + int pipe[2]; + Queue* dispatchQueue; +}; + + + +// ================================================================ +// EventChannel::Queue::implementation. + +static const char* shutdownMsg = "Event queue shut down."; + +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d), + myEvent(dir==IN ? EPOLLIN : EPOLLOUT) +{} + +void EventChannel::Queue::push(Event* e) { + Mutex::ScopedLock l(lock); + if (descriptor.isShutdown()) + THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); + queue.push_back(e); + descriptor.update(); } -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); +void EventChannel::Queue::setBit(uint32_t &epollFlags) { + if (queue.empty()) + epollFlags &= ~myEvent; + else + epollFlags |= myEvent; } -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +Event* EventChannel::Queue::wake(uint32_t epollFlags) { + // Called with lock held. + if (!queue.empty() && (isMyEvent(epollFlags))) { + Event* e = queue.front()->complete(descriptor); + if (e) { + queue.pop_front(); + return e; + } + } + return 0; +} + +void EventChannel::Queue::shutdown() { + // Mark all pending events with a shutdown exception. + // The server threads will remove and dispatch the events. + // + qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, QPID_ERROR_LOCATION); + for_each(queue.begin(), queue.end(), + boost::bind(&Event::setException, _1, ex)); } -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) - return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; + +// ================================================================ +// Descriptor + + +void EventChannel::Descriptor::activate(int epollFd_, int myFd_) { + Mutex::ScopedLock l(lock); + assert(myFd < 0 || (myFd == myFd_)); // Can't change fd. + if (epollFd < 0) { // Means we're not polling. + epollFd = epollFd_; + myFd = myFd_; + epollCtl(EPOLL_CTL_ADD, 0); + } } -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) +void EventChannel::Descriptor::shutdown() { + Mutex::ScopedLock l(lock); + shutdownUnsafe(); +} + +void EventChannel::Descriptor::shutdownUnsafe() { + // Caller holds lock. + ::close(myFd); + epollFd = -1; // Indicate we are not polling. + inQueue.shutdown(); + outQueue.shutdown(); + epollCtl(EPOLL_CTL_DEL, 0); +} + +void EventChannel::Descriptor::update() { + // Caller holds lock. + uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; + inQueue.setBit(events); + outQueue.setBit(events); + epollCtl(EPOLL_CTL_MOD, events); +} + +void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { + // Caller holds lock + assert(!isShutdown()); + struct epoll_event ee; + memset(&ee, 0, sizeof(ee)); + ee.data.ptr = this; + ee.events = events; + int status = ::epoll_ctl(epollFd, op, myFd, &ee); + if (status < 0) throw QPID_POSIX_ERROR(errno); + } +} + + +EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { + Mutex::ScopedLock l(lock); + cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl; + // If we have an error: + if (epollEvents & (EPOLLERR | EPOLLHUP)) { + shutdownUnsafe(); + // Complete both sides on error so the event can fail and + // mark itself with an exception. + epollEvents |= EPOLLIN | EPOLLOUT; + } + EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents)); + update(); + return ready; } -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) + +// ================================================================ +// EventChannel::Impl + + +EventChannel::Impl::Impl(int epollSize): + epollFd(-1), dispatchQueue(0) { - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); + // Create the epoll file descriptor. + epollFd = epoll_create(epollSize); + QPID_POSIX_CHECK(epollFd); + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // We activate the FD when there are messages in the queue. + QPID_POSIX_CHECK(::pipe(pipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(pipe[1], &zero, 1)); + dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN); } -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); +EventChannel::Impl::~Impl() { + close(epollFd); + close(pipe[0]); + close(pipe[1]); } -Event* EventHandler::complete(EventHandler& eh) + +/** + * Wait for epoll to wake up, return the descriptor or 0 on timeout. + */ +Event* EventChannel::Impl::wait(Time timeoutNs) { - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); + // No lock, all thread safe calls or local variables: + // + const long timeoutMs = + (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; + struct epoll_event ee; + Event* event = 0; + bool doSwap = true; + + // Loop till we get a completed event. Some events may repost + // themselves and return 0, e.g. incomplete read or write events. + // + while (!event) { + int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. + if (n == 0) // Timeout + return 0; + if (n < 0 && errno != EINTR) // Interrupt, ignore it. + continue; + if (n < 0) + throw QPID_POSIX_ERROR(errno); + assert(n == 1); + Descriptor* ed = + reinterpret_cast<Descriptor*>(ee.data.ptr); + assert(ed); + EventPair ready = ed->wake(ee.events); + + // We can only return one event so if both completed push one + // onto the dispatch queue to be dispatched in another thread. + if (ready.first && ready.second) { + // Keep it fair: in & out take turns to be returned first. + if (doSwap) + swap(ready.first, ready.second); + doSwap = !doSwap; + event = ready.first; + dispatchQueue->push(ready.second); + } + else { + event = ready.first ? ready.first : ready.second; + } + } + return event; } - + +EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { + Mutex::ScopedLock l(lock); + Descriptor& ed = descriptors[fd]; + ed.activate(epollFd, fd); + return ed; +} + + // ================================================================ // EventChannel @@ -168,157 +367,138 @@ EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } -EventChannel::EventChannel() : handler(new EventHandler()) {} +EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} EventChannel::~EventChannel() {} -void EventChannel::postEvent(Event& e) +void EventChannel::post(Event& e) { - e.prepare(*handler); + e.prepare(*impl); } -Event* EventChannel::getEvent() -{ - static const int infiniteTimeout = -1; - ZeroStruct<struct epoll_event> epollEvent; - - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - // TODO aconway 2006-11-28: Proper handling/logging of errors. - cerr << BOOST_CURRENT_FUNCTION << " ignoring error " - << PosixError::getMessage(errno) << endl; - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast<Event*>(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; +void EventChannel::post(Event* e) { + assert(e); + post(*e); } -Event::~Event() {} - -void Event::prepare(EventHandler& handler) +Event* EventChannel::wait(Time timeoutNs) { - handler.mqPut(this); + return impl->wait(timeoutNs); } -bool Event::hasError() const { - return error; -} -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); +// ================================================================ +// Event and subclasses. + +Event::~Event() {} + +Exception::shared_ptr_const Event::getException() const { + return exception; } -Event* Event::complete(EventHandler&) -{ - return this; +void Event::throwIfException() { + if (getException()) + exception->throwSelf(); } void Event::dispatch() { + if (!callback.empty()) + callback(); +} + +void Event::setException(const std::exception& e) { + const Exception* ex = dynamic_cast<const Exception*>(&e); + if (ex) + exception.reset(ex->clone().release()); + else + exception.reset(new Exception(e)); +#ifndef NDEBUG + // Throw and re-catch the exception. Has no effect on the + // program but it triggers debuggers watching for throw. The + // context that sets the exception is more informative for + // debugging purposes than the one that ultimately throws it. + // try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + throwIfException(); } + catch (...) { } // Ignored. +#endif } -void Event::setError(const ExceptionHolder& e) { - error = e; + +void ReadEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(IN).push(this); } -void ReadEvent::prepare(EventHandler& handler) +Event* ReadEvent::complete(EventChannel::Descriptor& ed) { - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); + ssize_t n = ::read(descriptor, + static_cast<char*>(buffer) + bytesRead, + size - bytesRead); + + if (n < 0 && errno != EAGAIN) { // Error + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. + } + else if (n == 0) { // End of file + // TODO aconway 2006-12-13: Don't treat EOF as exception + // unless we're partway thru a !noWait read. + setException(QPID_POSIX_ERROR(ENODATA)); + ed.shutdownUnsafe(); // Called with lock held. + } + else { + if (n > 0) // possible that n < 0 && errno == EAGAIN + bytesRead += n; + if (bytesRead < size && !noWait) { + // Continue reading, not enough data. + return 0; + } + } + return this; } -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, - size - received); - if (n > 0) received += n; - return n; + +void WriteEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(OUT).push(this); } -Event* ReadEvent::complete(EventHandler& handler) + +Event* WriteEvent::complete(EventChannel::Descriptor& ed) { - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); + ssize_t n = ::write(descriptor, + static_cast<const char*>(buffer) + bytesWritten, + size - bytesWritten); + if(n < 0 && errno == EAGAIN && noWait) { return 0; } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + if (n < 0 || (bytesWritten += n) < size) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } + return this; } -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +void AcceptEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(IN).push(this); } -Event* WriteEvent::complete(EventHandler& handler) +Event* AcceptEvent::complete(EventChannel::Descriptor& ed) { - ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; + accepted = ::accept(descriptor, 0, 0); + if (accepted < 0) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); return this; } -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +void DispatchEvent::prepare(EventChannel::Impl& impl) { + impl.getDispatchQueue().push(this); } -Event* AcceptEvent::complete(EventHandler& handler) +Event* DispatchEvent::complete(EventChannel::Descriptor&) { - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); return this; } |