summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/EventChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannel.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/EventChannel.cpp679
1 files changed, 475 insertions, 204 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp
index 6db397a165..d35eedf5a5 100644
--- a/cpp/src/qpid/sys/posix/EventChannel.cpp
+++ b/cpp/src/qpid/sys/posix/EventChannel.cpp
@@ -1,4 +1,4 @@
-/*
+/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
@@ -16,6 +16,19 @@
*
*/
+// TODO aconway 2006-12-15: Locking review.
+
+// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
+// get them from channel, pass them to Event constructors.
+// Eliminate lookup.
+
+
+#include "EventChannel.h"
+#include "check.h"
+
+#include "qpid/QpidError.h"
+#include "qpid/sys/AtomicCount.h"
+
#include <mqueue.h>
#include <string.h>
#include <iostream>
@@ -29,139 +42,420 @@
#include <queue>
#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
-
-#include "qpid/QpidError.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/log/Statement.h"
-
-#include "check.h"
-#include "EventChannel.h"
+#include <boost/noncopyable.hpp>
+#include <boost/bind.hpp>
using namespace std;
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
- ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-
namespace qpid {
namespace sys {
+// ================================================================
+// Private class declarations
+
+namespace {
+
+typedef enum { IN, OUT } Direction;
+
+typedef std::pair<Event*, Event*> EventPair;
+
/**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */
-class EventHandler : public Event, private Monitor
+ * Template to zero out a C-struct on construction. Avoids uninitialized memory
+ * warnings from valgrind or other mem checking tool.
+ */
+template <class T> struct CleanStruct : public T {
+ CleanStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+} // namespace
+
+/**
+ * Queue of events corresponding to one IO direction (IN or OUT).
+ * Each Descriptor contains two Queues.
+ */
+class EventChannel::Queue : private boost::noncopyable
{
public:
- EventHandler(int epollSize = 256);
- ~EventHandler();
+ Queue(Descriptor& container, Direction dir);
- int getEpollFd() { return epollFd; }
- void epollAdd(int fd, uint32_t epollEvents, Event* event);
- void epollMod(int fd, uint32_t epollEvents, Event* event);
- void epollDel(int fd);
+ /** Called by Event classes in prepare() */
+ void push(Event* e);
- void mqPut(Event* event);
- Event* mqGet();
-
- protected:
- // Should never be called, only complete.
- void prepare(EventHandler&) { assert(0); }
- Event* complete(EventHandler& eh);
+ /** Called when epoll wakes.
+ *@return The next completed event or 0.
+ */
+ Event* wake(uint32_t epollFlags);
+
+ Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; }
+
+ bool empty() { return queue.empty(); }
+
+ void setBit(uint32_t &epollFlags);
+
+ void shutdown();
private:
+ typedef std::deque<Event*> EventQ;
+
+ inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
+
+ Mutex& lock; // Shared with Descriptor.
+ Descriptor& descriptor;
+ uint32_t myEvent; // Epoll event flag.
+ EventQ queue;
+};
+
+
+/**
+ * Manages a file descriptor in an epoll set.
+ *
+ * Can be shutdown and re-activated for the same file descriptor.
+ */
+class EventChannel::Descriptor : private boost::noncopyable {
+ public:
+ explicit Descriptor(int fd) : epollFd(-1), myFd(fd),
+ inQueue(*this, IN), outQueue(*this, OUT) {}
+
+ void activate(int epollFd_);
+
+ /** Epoll woke up for this descriptor. */
+ Event* wake(uint32_t epollEvents);
+
+ /** Shut down: close and remove file descriptor.
+ * May be re-activated if fd is reused.
+ */
+ void shutdown();
+
+ // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
+ void shutdownUnsafe();
+
+ bool isShutdown() { return epollFd == -1; }
+
+ Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
+ int getFD() const { return myFd; }
+
+ private:
+ void update();
+ void epollCtl(int op, uint32_t events);
+ Queue* pick();
+
+ Mutex lock;
int epollFd;
- std::string mqName;
- int mqFd;
- std::queue<Event*> mqEvents;
+ int myFd;
+ Queue inQueue, outQueue;
+ bool preferIn;
+
+ friend class Queue;
};
-EventHandler::EventHandler(int epollSize)
-{
- epollFd = epoll_create(epollSize);
- if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+/**
+ * Holds a map of Descriptors, which do most of the work.
+ */
+class EventChannel::Impl {
+ public:
+ Impl(int size = 256);
+
+ ~Impl();
+
+ /**
+ * Activate descriptor
+ */
+ void activate(Descriptor& d) {
+ d.activate(epollFd);
+ }
+
+ /** Wait for an event, return 0 on timeout */
+ Event* wait(Duration timeout);
- // Create a POSIX message queue for non-fd events.
- // We write one byte and never read it is always ready for read
- // when we add it to epoll.
+ void shutdown();
+
+ private:
+
+ Monitor monitor;
+ int epollFd;
+ int shutdownPipe[2];
+ AtomicCount nWaiters;
+ bool isShutdown;
+};
+
+
+// ================================================================
+// EventChannel::Queue::implementation.
+
+static const char* shutdownMsg = "Event queue shut down.";
+
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) :
+ lock(d.lock), descriptor(d),
+ myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
+{}
+
+void EventChannel::Queue::push(Event* e) {
+ Mutex::ScopedLock l(lock);
+ if (descriptor.isShutdown())
+ THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
+ queue.push_back(e);
+ descriptor.update();
+}
+
+void EventChannel::Queue::setBit(uint32_t &epollFlags) {
+ if (queue.empty())
+ epollFlags &= ~myEvent;
+ else
+ epollFlags |= myEvent;
+}
+
+// TODO aconway 2006-12-20: REMOVE
+Event* EventChannel::Queue::wake(uint32_t epollFlags) {
+ // Called with lock held.
+ if (!queue.empty() && (isMyEvent(epollFlags))) {
+ assert(!queue.empty());
+ Event* e = queue.front();
+ assert(e);
+ if (!e->getException()) {
+ // TODO aconway 2006-12-20: Can/should we move event completion
+ // out into dispatch() so it doesn't happen in Descriptor locks?
+ e->complete(descriptor);
+ }
+ queue.pop_front();
+ return e;
+ }
+ return 0;
+}
+
+void EventChannel::Queue::shutdown() {
+ // Mark all pending events with a shutdown exception.
+ // The server threads will remove and dispatch the events.
//
- ZeroStruct<struct mq_attr> attr;
- attr.mq_maxmsg = 1;
- attr.mq_msgsize = 1;
- do {
- char tmpnam[L_tmpnam];
- tmpnam_r(tmpnam);
- mqName = tmpnam + 4; // Skip "tmp/"
- mqFd = mq_open(
- mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
- if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
- } while (mqFd == EEXIST); // Name already taken, try again.
+ qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE);
+ for_each(queue.begin(), queue.end(),
+ boost::bind(&Event::setException, _1, ex));
+}
- static char zero = '\0';
- mq_send(mqFd, &zero, 1, 0);
- epollAdd(mqFd, 0, this);
+
+// ================================================================
+// Descriptor
+
+
+void EventChannel::Descriptor::activate(int epollFd_) {
+ Mutex::ScopedLock l(lock);
+ if (isShutdown()) {
+ epollFd = epollFd_; // We're back in business.
+ epollCtl(EPOLL_CTL_ADD, 0);
+ }
}
-EventHandler::~EventHandler() {
- mq_close(mqFd);
- mq_unlink(mqName.c_str());
+void EventChannel::Descriptor::shutdown() {
+ Mutex::ScopedLock l(lock);
+ shutdownUnsafe();
}
-void EventHandler::mqPut(Event* event) {
- ScopedLock l(*this);
- assert(event != 0);
- mqEvents.push(event);
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+void EventChannel::Descriptor::shutdownUnsafe() {
+ // Caller holds lock.
+ ::close(myFd);
+ epollFd = -1; // Mark myself as shutdown.
+ inQueue.shutdown();
+ outQueue.shutdown();
+}
+
+// TODO aconway 2006-12-20: Inline into wake().
+void EventChannel::Descriptor::update() {
+ // Caller holds lock.
+ if (isShutdown()) // Nothing to do
+ return;
+ uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP;
+ inQueue.setBit(events);
+ outQueue.setBit(events);
+ epollCtl(EPOLL_CTL_MOD, events);
+}
+
+void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
+ // Caller holds lock
+ assert(!isShutdown());
+ CleanStruct<epoll_event> ee;
+ ee.data.ptr = this;
+ ee.events = events;
+ int status = ::epoll_ctl(epollFd, op, myFd, &ee);
+ if (status < 0) {
+ if (errno == EEXIST) // It's okay to add an existing fd
+ return;
+ else if (errno == EBADF) // FD was closed externally.
+ shutdownUnsafe();
+ else
+ throw QPID_POSIX_ERROR(errno);
+ }
}
+
-Event* EventHandler::mqGet() {
- ScopedLock l(*this);
- if (mqEvents.empty())
+EventChannel::Queue* EventChannel::Descriptor::pick() {
+ if (inQueue.empty() && outQueue.empty())
return 0;
- Event* event = mqEvents.front();
- mqEvents.pop();
- if(!mqEvents.empty())
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
- return event;
+ if (inQueue.empty() || outQueue.empty())
+ return !inQueue.empty() ? &inQueue : &outQueue;
+ // Neither is empty, pick fairly.
+ preferIn = !preferIn;
+ return preferIn ? &inQueue : &outQueue;
}
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
+Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
+ Mutex::ScopedLock l(lock);
+ // On error, shut down the Descriptor and both queues.
+ if (epollEvents & (EPOLLERR | EPOLLHUP)) {
+ shutdownUnsafe();
+ // TODO aconway 2006-12-20: This error handling models means
+ // that any error reported by epoll will result in a shutdown
+ // exception on the events. Can we get more accurate error
+ // reporting somehow?
+ }
+ Queue*q = 0;
+ bool in = (epollEvents & EPOLLIN);
+ bool out = (epollEvents & EPOLLOUT);
+ if ((in && out) || isShutdown())
+ q = pick(); // Choose fairly, either non-empty queue.
+ else if (in)
+ q = &inQueue;
+ else if (out)
+ q = &outQueue;
+ Event* e = (q && !q->empty()) ? q->pop() : 0;
+ update();
+ if (e)
+ e->complete(*this);
+ return e;
}
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+
+
+// ================================================================
+// EventChannel::Impl
+
+
+EventChannel::Impl::Impl(int epollSize):
+ epollFd(-1), isShutdown(false)
{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
+ // Create the epoll file descriptor.
+ epollFd = epoll_create(epollSize);
+ QPID_POSIX_CHECK(epollFd);
+
+ // Create a pipe and write a single byte. The byte is never
+ // read so the pipes read fd is always ready for read.
+ // We activate the FD when there are messages in the queue.
+ QPID_POSIX_CHECK(::pipe(shutdownPipe));
+ static char zero = '\0';
+ QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1));
}
-void EventHandler::epollDel(int fd) {
- if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
- throw QPID_POSIX_ERROR(errno);
+EventChannel::Impl::~Impl() {
+ shutdown();
+ ::close(epollFd);
+ ::close(shutdownPipe[0]);
+ ::close(shutdownPipe[1]);
}
-Event* EventHandler::complete(EventHandler& eh)
-{
- assert(&eh == this);
- Event* event = mqGet();
- return event==0 ? 0 : event->complete(eh);
+
+void EventChannel::Impl::shutdown() {
+ Monitor::ScopedLock l(monitor);
+ if (!isShutdown) { // I'm starting shutdown.
+ isShutdown = true;
+ if (nWaiters == 0)
+ return;
+
+ // TODO aconway 2006-12-20: If I just close the epollFd will
+ // that wake all threads? If so with what? Would be simpler than:
+
+ CleanStruct<epoll_event> ee;
+ ee.data.ptr = 0;
+ ee.events = EPOLLIN;
+ QPID_POSIX_CHECK(
+ epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee));
+ }
+ // Wait for nWaiters to get out.
+ while (nWaiters > 0) {
+ monitor.wait();
+ }
+}
+
+// TODO aconway 2006-12-20: DEBUG remove
+struct epoll {
+ epoll(uint32_t e) : events(e) { }
+ uint32_t events;
+};
+
+#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "")
+ostream& operator << (ostream& out, epoll e) {
+ out << "epoll_event.events: ";
+ BIT(EPOLLIN);
+ BIT(EPOLLPRI);
+ BIT(EPOLLOUT);
+ BIT(EPOLLRDNORM);
+ BIT(EPOLLRDBAND);
+ BIT(EPOLLWRNORM);
+ BIT(EPOLLWRBAND);
+ BIT(EPOLLMSG);
+ BIT(EPOLLERR);
+ BIT(EPOLLHUP);
+ BIT(EPOLLONESHOT);
+ BIT(EPOLLET);
+ return out;
}
+
+
+/**
+ * Wait for epoll to wake up, return the descriptor or 0 on timeout.
+ */
+Event* EventChannel::Impl::wait(Duration timeoutNs)
+{
+ {
+ Monitor::ScopedLock l(monitor);
+ if (isShutdown)
+ throw ShutdownException();
+ }
+
+ // Increase nWaiters for the duration, notify the monitor if I'm
+ // the last one out.
+ //
+ AtomicCount::ScopedIncrement si(
+ nWaiters, boost::bind(&Monitor::notifyAll, &monitor));
+
+ // No lock, all thread safe calls or local variables:
+ //
+ const long timeoutMs =
+ (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
+ CleanStruct<epoll_event> ee;
+ Event* event = 0;
+
+ // Loop till we get a completed event. Some events may repost
+ // themselves and return 0, e.g. incomplete read or write events.
+ //TODO aconway 2006-12-20: FIX THIS!
+ while (!event) {
+ int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
+ if (n == 0) // Timeout
+ return 0;
+ if (n < 0 && errno == EINTR) // Interrupt, ignore it.
+ continue;
+ if (n < 0)
+ throw QPID_POSIX_ERROR(errno);
+ assert(n == 1);
+ Descriptor* ed =
+ reinterpret_cast<Descriptor*>(ee.data.ptr);
+ if (ed == 0) // We're being shut-down.
+ throw ShutdownException();
+ assert(ed != 0);
+ event = ed->wake(ee.events);
+ }
+ return event;
+}
+
+//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
+// Mutex::ScopedLock l(monitor);
+// Descriptor& ed = descriptors[fd];
+// ed.activate(epollFd, fd);
+// return ed;
+//}
+
+
// ================================================================
// EventChannel
@@ -169,157 +463,134 @@ EventChannel::shared_ptr EventChannel::create() {
return shared_ptr(new EventChannel());
}
-EventChannel::EventChannel() : handler(new EventHandler()) {}
+EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
EventChannel::~EventChannel() {}
-void EventChannel::postEvent(Event& e)
+void EventChannel::post(Event& e)
{
- e.prepare(*handler);
+ e.prepare(*impl);
}
-Event* EventChannel::getEvent()
+Event* EventChannel::wait(Duration timeoutNs)
{
- static const int infiniteTimeout = -1;
- ZeroStruct<struct epoll_event> epollEvent;
+ return impl->wait(timeoutNs);
+}
- // Loop until we can complete the event. Some events may re-post
- // themselves and return 0 from complete, e.g. partial reads. //
- Event* event = 0;
- while (event == 0) {
- int eventCount = epoll_wait(handler->getEpollFd(),
- &epollEvent, 1, infiniteTimeout);
- if (eventCount < 0) {
- if (errno != EINTR) {
- QPID_LOG(warn, "Ignoring error: "
- << PosixError::getMessage(errno));
- assert(0);
- }
- }
- else if (eventCount == 1) {
- event = reinterpret_cast<Event*>(epollEvent.data.ptr);
- assert(event != 0);
- try {
- event = event->complete(*handler);
- }
- catch (const Exception& e) {
- if (event)
- event->setError(e);
- }
- catch (const std::exception& e) {
- if (event)
- event->setError(e);
- }
- }
- }
- return event;
+void EventChannel::shutdown() {
+ impl->shutdown();
}
+
+// ================================================================
+// Event and subclasses.
+
Event::~Event() {}
-void Event::prepare(EventHandler& handler)
-{
- handler.mqPut(this);
+Exception::shared_ptr_const Event::getException() const {
+ return exception;
}
-bool Event::hasError() const {
- return error;
+void Event::throwIfException() {
+ if (getException())
+ exception->throwSelf();
}
-void Event::throwIfError() throw (Exception) {
- if (hasError())
- error.throwSelf();
-}
-
-Event* Event::complete(EventHandler&)
+void Event::dispatch()
{
- return this;
+ if (!callback.empty())
+ callback();
}
-void Event::dispatch()
-{
+void Event::setException(const std::exception& e) {
+ const Exception* ex = dynamic_cast<const Exception*>(&e);
+ if (ex)
+ exception.reset(ex->clone().release());
+ else
+ exception.reset(new Exception(e));
+#ifndef NDEBUG
+ // Throw and re-catch the exception. Has no effect on the
+ // program but it triggers debuggers watching for throw. The
+ // context that sets the exception is more informative for
+ // debugging purposes than the one that ultimately throws it.
+ //
try {
- if (!callback.empty())
- callback();
- } catch (const std::exception&) {
- throw;
- } catch (...) {
- throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ throwIfException();
}
+ catch (...) { } // Ignored.
+#endif
}
-void Event::setError(const ExceptionHolder& e) {
- error = e;
+int FDEvent::getFDescriptor() const {
+ return descriptor.getFD();
}
-void ReadEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak
+ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) :
+ IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) {
}
-ssize_t ReadEvent::doRead() {
- ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
- size - received);
- if (n > 0) received += n;
- return n;
+void ReadEvent::prepare(EventChannel::Impl& impl) {
+ EventChannel::Descriptor& d = getDescriptor();
+ impl.activate(d);
+ d.getQueue(IN).push(this);
}
-Event* ReadEvent::complete(EventHandler& handler)
+void ReadEvent::complete(EventChannel::Descriptor& ed)
{
- // Read as much as possible without blocking.
- ssize_t n = doRead();
- while (n > 0 && received < size) doRead();
-
- if (received == size) {
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- return this;
- }
- else if (n <0 && (errno == EAGAIN)) {
- // Keep polling for more.
- handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
- return 0;
- }
- else {
- // Unexpected EOF or error. Throw ENODATA for EOF.
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ ssize_t n = ::read(getFDescriptor(),
+ static_cast<char*>(buffer) + bytesRead,
+ size - bytesRead);
+ if (n > 0)
+ bytesRead += n;
+ if (n == 0 || (n < 0 && errno != EAGAIN)) {
+ // Use ENODATA for file closed.
+ setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno));
+ ed.shutdownUnsafe();
}
}
-void WriteEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) :
+ IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) {
+}
+
+void WriteEvent::prepare(EventChannel::Impl& impl) {
+ EventChannel::Descriptor& d = getDescriptor();
+ impl.activate(d);
+ d.getQueue(OUT).push(this);
}
-Event* WriteEvent::complete(EventHandler& handler)
+
+void WriteEvent::complete(EventChannel::Descriptor& ed)
{
- ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
- size - written);
- if (n < 0) throw QPID_POSIX_ERROR(errno);
- written += n;
- if(written < size) {
- // Keep polling.
- handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
- return 0;
+ ssize_t n = ::write(getFDescriptor(),
+ static_cast<const char*>(buffer) + bytesWritten,
+ size - bytesWritten);
+ if (n > 0)
+ bytesWritten += n;
+ if(n < 0 && errno != EAGAIN) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
}
- written = 0; // Reset for re-use.
- handler.epollDel(descriptor);
- return this;
}
-void AcceptEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+AcceptEvent::AcceptEvent(int fd, Callback cb) :
+ FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) {
+}
+
+void AcceptEvent::prepare(EventChannel::Impl& impl) {
+ EventChannel::Descriptor& d = getDescriptor();
+ impl.activate(d);
+ d.getQueue(IN).push(this);
}
-Event* AcceptEvent::complete(EventHandler& handler)
+void AcceptEvent::complete(EventChannel::Descriptor& ed)
{
- handler.epollDel(descriptor);
- accepted = ::accept(descriptor, 0, 0);
- if (accepted < 0) throw QPID_POSIX_ERROR(errno);
- return this;
+ accepted = ::accept(getFDescriptor(), 0, 0);
+ if (accepted < 0) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
+ }
}
}}