diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp | 244 |
1 files changed, 191 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp index 783f84576b..f12012cbb0 100644 --- a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp +++ b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp @@ -30,9 +30,11 @@ #include <port.h> #include <poll.h> #include <errno.h> +#include <pthread.h> +#include <signal.h> #include <assert.h> -#include <vector> +#include <queue> #include <exception> @@ -43,11 +45,11 @@ namespace qpid { namespace sys { // Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used -DeletionManager<PollerHandle> PollerHandleDeletionManager; +DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager; // Instantiate (and define) class static for DeletionManager template <> -DeletionManager<PollerHandle>::AllThreadsStatuses DeletionManager<PollerHandle>::allThreadsStatuses(0); +DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0); class PollerHandlePrivate { friend class Poller; @@ -58,7 +60,8 @@ class PollerHandlePrivate { MONITORED, INACTIVE, HUNGUP, - MONITORED_HUNGUP + MONITORED_HUNGUP, + DELETED }; int fd; @@ -104,6 +107,14 @@ class PollerHandlePrivate { assert(stat == MONITORED); stat = HUNGUP; } + + bool isDeleted() const { + return stat == DELETED; + } + + void setDeleted() { + stat = DELETED; + } }; PollerHandle::PollerHandle(const IOHandle& h) : @@ -111,11 +122,16 @@ PollerHandle::PollerHandle(const IOHandle& h) : {} PollerHandle::~PollerHandle() { - delete impl; -} - -void PollerHandle::deferDelete() { - PollerHandleDeletionManager.markForDeletion(this); + { + ScopedLock<Mutex> l(impl->lock); + if (impl->isDeleted()) { + return; + } + if (impl->isActive()) { + impl->setDeleted(); + } + } + PollerHandleDeletionManager.markForDeletion(impl); } /** @@ -125,35 +141,82 @@ void PollerHandle::deferDelete() { class PollerPrivate { friend class Poller; - const int portId; + class InterruptHandle: public PollerHandle { + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + //Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + //Process synthesised event + event.process(); + } + public: + InterruptHandle() : PollerHandle(DummyIOHandle) {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle *getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } + }; + + const int portId; + bool isShutdown; + InterruptHandle interruptHandle; + static uint32_t directionToPollEvent(Poller::Direction dir) { switch (dir) { - case Poller::INPUT: return POLLIN; - case Poller::OUTPUT: return POLLOUT; - case Poller::INOUT: return POLLIN | POLLOUT; - default: return 0; + case Poller::INPUT: return POLLIN; + case Poller::OUTPUT: return POLLOUT; + case Poller::INOUT: return POLLIN | POLLOUT; + default: return 0; } } static Poller::EventType pollToDirection(uint32_t events) { uint32_t e = events & (POLLIN | POLLOUT); switch (e) { - case POLLIN: return Poller::READABLE; - case POLLOUT: return Poller::WRITABLE; - case POLLIN | POLLOUT: return Poller::READ_WRITABLE; - default: - return (events & (POLLHUP | POLLERR)) ? - Poller::DISCONNECTED : Poller::INVALID; + case POLLIN: return Poller::READABLE; + case POLLOUT: return Poller::WRITABLE; + case POLLIN | POLLOUT: return Poller::READ_WRITABLE; + default: + return (events & (POLLHUP | POLLERR)) ? + Poller::DISCONNECTED : Poller::INVALID; } } - + PollerPrivate() : - portId(::port_create()) { + portId(::port_create()), + isShutdown(false) { + QPID_POSIX_CHECK(portId); + QPID_LOG(trace, "port_create returned port Id: " << portId); } ~PollerPrivate() { } + + void interrupt() { + //Send an Alarm to the port + //We need to send a nonzero event mask, using POLLHUP, + //nevertheless the wait method will only look for a PORT_ALERT_SET + QPID_LOG(trace, "Sending a port_alert to " << portId); + QPID_POSIX_CHECK(::port_alert(portId, PORT_ALERT_SET, POLLHUP, + &static_cast<PollerHandle&>(interruptHandle))); + } }; void Poller::addFd(PollerHandle& handle, Direction dir) { @@ -177,7 +240,6 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { QPID_LOG(trace, "Poller::addFd(handle=" << &handle << "[" << typeid(&handle).name() << "], fd=" << eh.fd << ")"); - //assert(dynamic_cast<DispatchHandle*>(&handle)); } void Poller::delFd(PollerHandle& handle) { @@ -223,17 +285,56 @@ void Poller::rearmFd(PollerHandle& handle) { } void Poller::shutdown() { - //Send an Alarm to the port - //We need to send a nonzero event mask, using POLLHUP, but - //The wait method will only look for a PORT_ALERT_SET - QPID_POSIX_CHECK(::port_alert(impl->portId, PORT_ALERT_SET, POLLHUP, NULL)); - QPID_LOG(trace, "Poller::shutdown"); + //Allow sloppy code to shut us down more than once + if (impl->isShutdown) + return; + + impl->isShutdown = true; + impl->interrupt(); +} + +bool Poller::interrupt(PollerHandle& handle) { + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; + ScopedLock<Mutex> l(eh.lock); + ih.addHandle(handle); + impl->interrupt(); + eh.setActive(); + return true; +} + +void Poller::run() { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); } Poller::Event Poller::wait(Duration timeout) { timespec_t tout; timespec_t* ptout = NULL; port_event_t pe; + + AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE : + AbsTime(now(), timeout); if (timeout != TIME_INFINITE) { tout.tv_sec = 0; @@ -243,12 +344,21 @@ Poller::Event Poller::wait(Duration timeout) { do { PollerHandleDeletionManager.markAllUnusedInThisThread(); - QPID_LOG(trace, "About to enter port_get. Thread " - << pthread_self() + QPID_LOG(trace, "About to enter port_get on " << impl->portId + << ". Thread " << pthread_self() << ", timeout=" << timeout); - + + int rc = ::port_get(impl->portId, &pe, ptout); + QPID_LOG(trace, "port_get on " << impl->portId + << " returned " << rc); + + if (impl->isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, SHUTDOWN); + } + if (rc < 0) { switch (errno) { case EINTR: @@ -259,33 +369,61 @@ Poller::Event Poller::wait(Duration timeout) { QPID_POSIX_CHECK(rc); } } else { - //We use alert mode to notify the shutdown of the Poller - if (pe.portev_source == PORT_SOURCE_ALERT) { - return Event(0, SHUTDOWN); - } - if (pe.portev_source == PORT_SOURCE_FD) { - PollerHandle *handle = static_cast<PollerHandle*>(pe.portev_user); - PollerHandlePrivate& eh = *handle->impl; - ScopedLock<Mutex> l(eh.lock); - QPID_LOG(trace, "About to send handle: " << handle); + PollerHandle* handle = static_cast<PollerHandle*>(pe.portev_user); + PollerHandlePrivate& eh = *handle->impl; + ScopedLock<Mutex> l(eh.lock); + + if (eh.isActive()) { + QPID_LOG(trace, "Handle is active"); + //We use alert mode to notify interrupts + if (pe.portev_source == PORT_SOURCE_ALERT && + handle == &impl->interruptHandle) { + QPID_LOG(trace, "Interrupt notified"); + + PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); + + if (impl->interruptHandle.queuedHandles()) { + impl->interrupt(); + eh.setActive(); + } else { + eh.setInactive(); + } + return Event(wrappedHandle, INTERRUPTED); + } - if (eh.isActive()) { - if (pe.portev_events & POLLHUP) { - if (eh.isHungup()) { - return Event(handle, DISCONNECTED); + if (pe.portev_source == PORT_SOURCE_FD) { + QPID_LOG(trace, "About to send handle: " << handle); + if (pe.portev_events & POLLHUP) { + if (eh.isHungup()) { + return Event(handle, DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); } - eh.setHungup(); - } else { - eh.setInactive(); - } - QPID_LOG(trace, "Sending event (thread: " - << pthread_self() << ") for handle " << handle - << ", direction= " - << PollerPrivate::pollToDirection(pe.portev_events)); - return Event(handle, PollerPrivate::pollToDirection(pe.portev_events)); + QPID_LOG(trace, "Sending event (thread: " + << pthread_self() << ") for handle " << handle + << ", direction= " + << PollerPrivate::pollToDirection(pe.portev_events)); + return Event(handle, PollerPrivate::pollToDirection(pe.portev_events)); + } + } else if (eh.isDeleted()) { + //Remove the handle from the poller + int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, + (uintptr_t) eh.fd); + if (rc == -1 && errno != EBADFD) { + QPID_POSIX_CHECK(rc); } } } + + if (timeout == TIME_INFINITE) { + continue; + } + if (rc == 0 && now() > targetTimeout) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, TIMEOUT); + } } while (true); } |