diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp | 444 |
1 files changed, 444 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp new file mode 100644 index 0000000000..06d542c938 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp @@ -0,0 +1,444 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Logger.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/DeletionManager.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" + +#include <port.h> +#include <poll.h> +#include <errno.h> +#include <pthread.h> +#include <signal.h> + +#include <assert.h> +#include <queue> +#include <exception> + + +//TODO: Remove this +#include "qpid/sys/Dispatcher.h" + +namespace qpid { +namespace sys { + +// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used +DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager; + +// Instantiate (and define) class static for DeletionManager +template <> +DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0); + +class PollerHandlePrivate { + friend class Poller; + friend class PollerHandle; + + enum FDStat { + ABSENT, + MONITORED, + INACTIVE, + HUNGUP, + MONITORED_HUNGUP, + DELETED + }; + + int fd; + uint32_t events; + FDStat stat; + Mutex lock; + + PollerHandlePrivate(int f) : + fd(f), + events(0), + stat(ABSENT) { + } + + bool isActive() const { + return stat == MONITORED || stat == MONITORED_HUNGUP; + } + + void setActive() { + stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED; + } + + bool isInactive() const { + return stat == INACTIVE || stat == HUNGUP; + } + + void setInactive() { + stat = INACTIVE; + } + + bool isIdle() const { + return stat == ABSENT; + } + + void setIdle() { + stat = ABSENT; + } + + bool isHungup() const { + return stat == MONITORED_HUNGUP || stat == HUNGUP; + } + + void setHungup() { + assert(stat == MONITORED); + stat = HUNGUP; + } + + bool isDeleted() const { + return stat == DELETED; + } + + void setDeleted() { + stat = DELETED; + } +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(toFd(h.impl))) +{} + +PollerHandle::~PollerHandle() { + { + ScopedLock<Mutex> l(impl->lock); + if (impl->isDeleted()) { + return; + } + if (impl->isActive()) { + impl->setDeleted(); + } + } + PollerHandleDeletionManager.markForDeletion(impl); +} + +/** + * Concrete implementation of Poller to use the Solaris Event Completion + * Framework interface + */ +class PollerPrivate { + friend class Poller; + + class InterruptHandle: public PollerHandle { + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + //Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + //Process synthesised event + event.process(); + } + + public: + InterruptHandle() : PollerHandle(DummyIOHandle) {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle *getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } + }; + + const int portId; + bool isShutdown; + InterruptHandle interruptHandle; + + static uint32_t directionToPollEvent(Poller::Direction dir) { + switch (dir) { + case Poller::INPUT: return POLLIN; + case Poller::OUTPUT: return POLLOUT; + case Poller::INOUT: return POLLIN | POLLOUT; + default: return 0; + } + } + + static Poller::EventType pollToDirection(uint32_t events) { + uint32_t e = events & (POLLIN | POLLOUT); + switch (e) { + case POLLIN: return Poller::READABLE; + case POLLOUT: return Poller::WRITABLE; + case POLLIN | POLLOUT: return Poller::READ_WRITABLE; + default: + return (events & (POLLHUP | POLLERR)) ? + Poller::DISCONNECTED : Poller::INVALID; + } + } + + PollerPrivate() : + portId(::port_create()), + isShutdown(false) { + QPID_POSIX_CHECK(portId); + QPID_LOG(trace, "port_create returned port Id: " << portId); + } + + ~PollerPrivate() { + } + + void interrupt() { + //Send an Alarm to the port + //We need to send a nonzero event mask, using POLLHUP, + //nevertheless the wait method will only look for a PORT_ALERT_SET + QPID_LOG(trace, "Sending a port_alert to " << portId); + QPID_POSIX_CHECK(::port_alert(portId, PORT_ALERT_SET, POLLHUP, + &static_cast<PollerHandle&>(interruptHandle))); + } +}; + +void Poller::addFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + + uint32_t events = 0; + + if (eh.isIdle()) { + events = PollerPrivate::directionToPollEvent(dir); + } else { + assert(eh.isActive()); + events = eh.events | PollerPrivate::directionToPollEvent(dir); + } + + //port_associate can be used to add an association or modify an + //existing one + QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, events, &handle)); + eh.events = events; + eh.setActive(); + QPID_LOG(trace, "Poller::addFd(handle=" << &handle + << "[" << typeid(&handle).name() + << "], fd=" << eh.fd << ")"); +} + +void Poller::delFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd); + //Allow closing an invalid fd, allowing users to close fd before + //doing delFd() + if (rc == -1 && errno != EBADFD) { + QPID_POSIX_CHECK(rc); + } + eh.setIdle(); + QPID_LOG(trace, "Poller::delFd(handle=" << &handle + << ", fd=" << eh.fd << ")"); +} + +// modFd is equivalent to delFd followed by addFd +void Poller::modFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + eh.events = PollerPrivate::directionToPollEvent(dir); + + //If fd is already associated, events and user arguments are updated + //So, no need to check if fd is already associated + QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle)); + eh.setActive(); + QPID_LOG(trace, "Poller::modFd(handle=" << &handle + << ", fd=" << eh.fd << ")"); +} + +void Poller::rearmFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.isInactive()); + + QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle)); + eh.setActive(); + QPID_LOG(trace, "Poller::rearmdFd(handle=" << &handle + << ", fd=" << eh.fd << ")"); +} + +void Poller::shutdown() { + //Allow sloppy code to shut us down more than once + if (impl->isShutdown) + return; + + impl->isShutdown = true; + impl->interrupt(); +} + +bool Poller::hasShutdown() +{ + return impl->isShutdown; +} + +bool Poller::interrupt(PollerHandle& handle) { + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; + ScopedLock<Mutex> l(eh.lock); + ih.addHandle(handle); + impl->interrupt(); + eh.setActive(); + return true; +} + +void Poller::run() { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); +} + +Poller::Event Poller::wait(Duration timeout) { + timespec_t tout; + timespec_t* ptout = NULL; + port_event_t pe; + + AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE : + AbsTime(now(), timeout); + + if (timeout != TIME_INFINITE) { + tout.tv_sec = 0; + tout.tv_nsec = timeout; + ptout = &tout; + } + + do { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + QPID_LOG(trace, "About to enter port_get on " << impl->portId + << ". Thread " << pthread_self() + << ", timeout=" << timeout); + + + int rc = ::port_get(impl->portId, &pe, ptout); + + QPID_LOG(trace, "port_get on " << impl->portId + << " returned " << rc); + + if (impl->isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, SHUTDOWN); + } + + if (rc < 0) { + switch (errno) { + case EINTR: + continue; + case ETIME: + return Event(0, TIMEOUT); + default: + QPID_POSIX_CHECK(rc); + } + } else { + PollerHandle* handle = static_cast<PollerHandle*>(pe.portev_user); + PollerHandlePrivate& eh = *handle->impl; + ScopedLock<Mutex> l(eh.lock); + + if (eh.isActive()) { + QPID_LOG(trace, "Handle is active"); + //We use alert mode to notify interrupts + if (pe.portev_source == PORT_SOURCE_ALERT && + handle == &impl->interruptHandle) { + QPID_LOG(trace, "Interrupt notified"); + + PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); + + if (impl->interruptHandle.queuedHandles()) { + impl->interrupt(); + eh.setActive(); + } else { + eh.setInactive(); + } + return Event(wrappedHandle, INTERRUPTED); + } + + if (pe.portev_source == PORT_SOURCE_FD) { + QPID_LOG(trace, "About to send handle: " << handle); + if (pe.portev_events & POLLHUP) { + if (eh.isHungup()) { + return Event(handle, DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); + } + QPID_LOG(trace, "Sending event (thread: " + << pthread_self() << ") for handle " << handle + << ", direction= " + << PollerPrivate::pollToDirection(pe.portev_events)); + return Event(handle, PollerPrivate::pollToDirection(pe.portev_events)); + } + } else if (eh.isDeleted()) { + //Remove the handle from the poller + int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, + (uintptr_t) eh.fd); + if (rc == -1 && errno != EBADFD) { + QPID_POSIX_CHECK(rc); + } + } + } + + if (timeout == TIME_INFINITE) { + continue; + } + if (rc == 0 && now() > targetTimeout) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, TIMEOUT); + } + } while (true); +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} |