summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp244
1 files changed, 191 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
index 783f84576b..f12012cbb0 100644
--- a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
@@ -30,9 +30,11 @@
#include <port.h>
#include <poll.h>
#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
#include <assert.h>
-#include <vector>
+#include <queue>
#include <exception>
@@ -43,11 +45,11 @@ namespace qpid {
namespace sys {
// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used
-DeletionManager<PollerHandle> PollerHandleDeletionManager;
+DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;
// Instantiate (and define) class static for DeletionManager
template <>
-DeletionManager<PollerHandle>::AllThreadsStatuses DeletionManager<PollerHandle>::allThreadsStatuses(0);
+DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);
class PollerHandlePrivate {
friend class Poller;
@@ -58,7 +60,8 @@ class PollerHandlePrivate {
MONITORED,
INACTIVE,
HUNGUP,
- MONITORED_HUNGUP
+ MONITORED_HUNGUP,
+ DELETED
};
int fd;
@@ -104,6 +107,14 @@ class PollerHandlePrivate {
assert(stat == MONITORED);
stat = HUNGUP;
}
+
+ bool isDeleted() const {
+ return stat == DELETED;
+ }
+
+ void setDeleted() {
+ stat = DELETED;
+ }
};
PollerHandle::PollerHandle(const IOHandle& h) :
@@ -111,11 +122,16 @@ PollerHandle::PollerHandle(const IOHandle& h) :
{}
PollerHandle::~PollerHandle() {
- delete impl;
-}
-
-void PollerHandle::deferDelete() {
- PollerHandleDeletionManager.markForDeletion(this);
+ {
+ ScopedLock<Mutex> l(impl->lock);
+ if (impl->isDeleted()) {
+ return;
+ }
+ if (impl->isActive()) {
+ impl->setDeleted();
+ }
+ }
+ PollerHandleDeletionManager.markForDeletion(impl);
}
/**
@@ -125,35 +141,82 @@ void PollerHandle::deferDelete() {
class PollerPrivate {
friend class Poller;
- const int portId;
+ class InterruptHandle: public PollerHandle {
+ std::queue<PollerHandle*> handles;
+
+ void processEvent(Poller::EventType) {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ assert(handle);
+
+ //Synthesise event
+ Poller::Event event(handle, Poller::INTERRUPTED);
+
+ //Process synthesised event
+ event.process();
+ }
+ public:
+ InterruptHandle() : PollerHandle(DummyIOHandle) {}
+
+ void addHandle(PollerHandle& h) {
+ handles.push(&h);
+ }
+
+ PollerHandle *getHandle() {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ return handle;
+ }
+
+ bool queuedHandles() {
+ return handles.size() > 0;
+ }
+ };
+
+ const int portId;
+ bool isShutdown;
+ InterruptHandle interruptHandle;
+
static uint32_t directionToPollEvent(Poller::Direction dir) {
switch (dir) {
- case Poller::INPUT: return POLLIN;
- case Poller::OUTPUT: return POLLOUT;
- case Poller::INOUT: return POLLIN | POLLOUT;
- default: return 0;
+ case Poller::INPUT: return POLLIN;
+ case Poller::OUTPUT: return POLLOUT;
+ case Poller::INOUT: return POLLIN | POLLOUT;
+ default: return 0;
}
}
static Poller::EventType pollToDirection(uint32_t events) {
uint32_t e = events & (POLLIN | POLLOUT);
switch (e) {
- case POLLIN: return Poller::READABLE;
- case POLLOUT: return Poller::WRITABLE;
- case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
- default:
- return (events & (POLLHUP | POLLERR)) ?
- Poller::DISCONNECTED : Poller::INVALID;
+ case POLLIN: return Poller::READABLE;
+ case POLLOUT: return Poller::WRITABLE;
+ case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
+ default:
+ return (events & (POLLHUP | POLLERR)) ?
+ Poller::DISCONNECTED : Poller::INVALID;
}
}
-
+
PollerPrivate() :
- portId(::port_create()) {
+ portId(::port_create()),
+ isShutdown(false) {
+ QPID_POSIX_CHECK(portId);
+ QPID_LOG(trace, "port_create returned port Id: " << portId);
}
~PollerPrivate() {
}
+
+ void interrupt() {
+ //Send an Alarm to the port
+ //We need to send a nonzero event mask, using POLLHUP,
+ //nevertheless the wait method will only look for a PORT_ALERT_SET
+ QPID_LOG(trace, "Sending a port_alert to " << portId);
+ QPID_POSIX_CHECK(::port_alert(portId, PORT_ALERT_SET, POLLHUP,
+ &static_cast<PollerHandle&>(interruptHandle)));
+ }
};
void Poller::addFd(PollerHandle& handle, Direction dir) {
@@ -177,7 +240,6 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
QPID_LOG(trace, "Poller::addFd(handle=" << &handle
<< "[" << typeid(&handle).name()
<< "], fd=" << eh.fd << ")");
- //assert(dynamic_cast<DispatchHandle*>(&handle));
}
void Poller::delFd(PollerHandle& handle) {
@@ -223,17 +285,56 @@ void Poller::rearmFd(PollerHandle& handle) {
}
void Poller::shutdown() {
- //Send an Alarm to the port
- //We need to send a nonzero event mask, using POLLHUP, but
- //The wait method will only look for a PORT_ALERT_SET
- QPID_POSIX_CHECK(::port_alert(impl->portId, PORT_ALERT_SET, POLLHUP, NULL));
- QPID_LOG(trace, "Poller::shutdown");
+ //Allow sloppy code to shut us down more than once
+ if (impl->isShutdown)
+ return;
+
+ impl->isShutdown = true;
+ impl->interrupt();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+ PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
+ PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
+ ScopedLock<Mutex> l(eh.lock);
+ ih.addHandle(handle);
+ impl->interrupt();
+ eh.setActive();
+ return true;
+}
+
+void Poller::run() {
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
+ ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+ do {
+ Event event = wait();
+
+ // If can read/write then dispatch appropriate callbacks
+ if (event.handle) {
+ event.process();
+ } else {
+ // Handle shutdown
+ switch (event.type) {
+ case SHUTDOWN:
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ }
+ } while (true);
}
Poller::Event Poller::wait(Duration timeout) {
timespec_t tout;
timespec_t* ptout = NULL;
port_event_t pe;
+
+ AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE :
+ AbsTime(now(), timeout);
if (timeout != TIME_INFINITE) {
tout.tv_sec = 0;
@@ -243,12 +344,21 @@ Poller::Event Poller::wait(Duration timeout) {
do {
PollerHandleDeletionManager.markAllUnusedInThisThread();
- QPID_LOG(trace, "About to enter port_get. Thread "
- << pthread_self()
+ QPID_LOG(trace, "About to enter port_get on " << impl->portId
+ << ". Thread " << pthread_self()
<< ", timeout=" << timeout);
-
+
+
int rc = ::port_get(impl->portId, &pe, ptout);
+ QPID_LOG(trace, "port_get on " << impl->portId
+ << " returned " << rc);
+
+ if (impl->isShutdown) {
+ PollerHandleDeletionManager.markAllUnusedInThisThread();
+ return Event(0, SHUTDOWN);
+ }
+
if (rc < 0) {
switch (errno) {
case EINTR:
@@ -259,33 +369,61 @@ Poller::Event Poller::wait(Duration timeout) {
QPID_POSIX_CHECK(rc);
}
} else {
- //We use alert mode to notify the shutdown of the Poller
- if (pe.portev_source == PORT_SOURCE_ALERT) {
- return Event(0, SHUTDOWN);
- }
- if (pe.portev_source == PORT_SOURCE_FD) {
- PollerHandle *handle = static_cast<PollerHandle*>(pe.portev_user);
- PollerHandlePrivate& eh = *handle->impl;
- ScopedLock<Mutex> l(eh.lock);
- QPID_LOG(trace, "About to send handle: " << handle);
+ PollerHandle* handle = static_cast<PollerHandle*>(pe.portev_user);
+ PollerHandlePrivate& eh = *handle->impl;
+ ScopedLock<Mutex> l(eh.lock);
+
+ if (eh.isActive()) {
+ QPID_LOG(trace, "Handle is active");
+ //We use alert mode to notify interrupts
+ if (pe.portev_source == PORT_SOURCE_ALERT &&
+ handle == &impl->interruptHandle) {
+ QPID_LOG(trace, "Interrupt notified");
+
+ PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
+
+ if (impl->interruptHandle.queuedHandles()) {
+ impl->interrupt();
+ eh.setActive();
+ } else {
+ eh.setInactive();
+ }
+ return Event(wrappedHandle, INTERRUPTED);
+ }
- if (eh.isActive()) {
- if (pe.portev_events & POLLHUP) {
- if (eh.isHungup()) {
- return Event(handle, DISCONNECTED);
+ if (pe.portev_source == PORT_SOURCE_FD) {
+ QPID_LOG(trace, "About to send handle: " << handle);
+ if (pe.portev_events & POLLHUP) {
+ if (eh.isHungup()) {
+ return Event(handle, DISCONNECTED);
+ }
+ eh.setHungup();
+ } else {
+ eh.setInactive();
}
- eh.setHungup();
- } else {
- eh.setInactive();
- }
- QPID_LOG(trace, "Sending event (thread: "
- << pthread_self() << ") for handle " << handle
- << ", direction= "
- << PollerPrivate::pollToDirection(pe.portev_events));
- return Event(handle, PollerPrivate::pollToDirection(pe.portev_events));
+ QPID_LOG(trace, "Sending event (thread: "
+ << pthread_self() << ") for handle " << handle
+ << ", direction= "
+ << PollerPrivate::pollToDirection(pe.portev_events));
+ return Event(handle, PollerPrivate::pollToDirection(pe.portev_events));
+ }
+ } else if (eh.isDeleted()) {
+ //Remove the handle from the poller
+ int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD,
+ (uintptr_t) eh.fd);
+ if (rc == -1 && errno != EBADFD) {
+ QPID_POSIX_CHECK(rc);
}
}
}
+
+ if (timeout == TIME_INFINITE) {
+ continue;
+ }
+ if (rc == 0 && now() > targetTimeout) {
+ PollerHandleDeletionManager.markAllUnusedInThisThread();
+ return Event(0, TIMEOUT);
+ }
} while (true);
}