diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-07-30 06:29:51 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-07-30 06:29:51 +0000 |
commit | 7166856e59dde9d6590d8abe54f0deb73b751282 (patch) | |
tree | e236a06e12ad2c89ddba6b38749afdb04c8eb6c5 | |
parent | f1f80e7b3b1e17e663113382219d992680461063 (diff) | |
download | qpid-python-7166856e59dde9d6590d8abe54f0deb73b751282.tar.gz |
QPID-1198: Solaris ECF (port) based Poller
Patch from Manuel Teira
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@680921 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/configure.ac | 93 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 11 | ||||
-rw-r--r-- | cpp/src/qpid/sys/solaris/ECFPoller.cpp | 301 |
3 files changed, 378 insertions, 27 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac index 85c6aed024..1aa0a6fe63 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -54,31 +54,45 @@ AC_ARG_ENABLE(warnings, esac], [enableval=yes]) -# Warnings: Enable as many as possible, keep the code clean. Please -# do not disable warnings or remove -Werror without discussing on -# qpid-dev list. -# -# The following warnings are deliberately omitted, they warn on valid code. -# -Wunreachable-code -Wpadded -Winline -# -Wshadow - warns about boost headers. - -if test "${enableval}" = yes; then - gl_COMPILER_FLAGS(-Werror) - gl_COMPILER_FLAGS(-pedantic) - gl_COMPILER_FLAGS(-Wall) - gl_COMPILER_FLAGS(-Wextra) - gl_COMPILER_FLAGS(-Wno-shadow) - gl_COMPILER_FLAGS(-Wpointer-arith) - gl_COMPILER_FLAGS(-Wcast-qual) - gl_COMPILER_FLAGS(-Wcast-align) - gl_COMPILER_FLAGS(-Wno-long-long) - gl_COMPILER_FLAGS(-Wvolatile-register-var) - gl_COMPILER_FLAGS(-Winvalid-pch) - gl_COMPILER_FLAGS(-Wno-system-headers) - gl_COMPILER_FLAGS(-Woverloaded-virtual) - AC_SUBST([WARNING_CFLAGS], [$COMPILER_FLAGS]) - AC_DEFINE([lint], 1, [Define to 1 if the compiler is checking for lint.]) - COMPILER_FLAGS= +# Set up for gcc as compiler +if test x$GXX = xyes; then + # Warnings: Enable as many as possible, keep the code clean. Please + # do not disable warnings or remove -Werror without discussing on + # qpid-dev list. + # + # The following warnings are deliberately omitted, they warn on valid code. + # -Wunreachable-code -Wpadded -Winline + # -Wshadow - warns about boost headers. + if test "${enableval}" = yes; then + gl_COMPILER_FLAGS(-Werror) + gl_COMPILER_FLAGS(-pedantic) + gl_COMPILER_FLAGS(-Wall) + gl_COMPILER_FLAGS(-Wextra) + gl_COMPILER_FLAGS(-Wno-shadow) + gl_COMPILER_FLAGS(-Wpointer-arith) + gl_COMPILER_FLAGS(-Wcast-qual) + gl_COMPILER_FLAGS(-Wcast-align) + gl_COMPILER_FLAGS(-Wno-long-long) + gl_COMPILER_FLAGS(-Wvolatile-register-var) + gl_COMPILER_FLAGS(-Winvalid-pch) + gl_COMPILER_FLAGS(-Wno-system-headers) + gl_COMPILER_FLAGS(-Woverloaded-virtual) + AC_SUBST([WARNING_CFLAGS], [$COMPILER_FLAGS]) + AC_DEFINE([lint], 1, [Define to 1 if the compiler is checking for lint.]) + COMPILER_FLAGS= + fi +else + AC_CHECK_DECL([__SUNPRO_CC], [SUNCC=yes], [SUNCC=no]) + + # Set up for sun CC compiler + if test x$SUNCC = xno; then + if test "${enableval}" = yes; then + WARNING_FLAGS=+w + fi + CXXFLAGS="$CXXFLAGS -library=stlport4 -mt" + LD="$CXX" + LDFLAGS="$LDFLAGS -library=stlport4 -mt" + fi fi AC_DISABLE_STATIC @@ -278,6 +292,35 @@ AC_ARG_WITH([rdma], LIBS=$tmp_LIBS AM_CONDITIONAL([RDMA], [test x$with_RDMA = xyes]) +poller=no +AC_ARG_WITH([poller], + [AS_HELP_STRING([--with-poller], [The low level poller implementation: poll/solaris-ecf/epoll])], + [case ${withval} in + poll) + AC_CHECK_HEADERS([sys/poll.h],[poller=no],[AC_MSG_ERROR([Can't find poll.h header file for poll])]) + ;; + solaris-ecf) + AC_CHECK_HEADERS([port.h],[poller=solaris-ecf],[AC_MSG_ERROR([Can't find port.h header file for solaris-ecf])]) + ;; + epoll) + AC_CHECK_HEADERS([sys/epoll.h],[poller=epoll],[AC_MSG_ERROR([Can't find epoll.h header file for epoll])]) + ;; + esac], + [ + AC_CHECK_HEADERS([sys/poll.h],[poller=no],) + AC_CHECK_HEADERS([port.h],[poller=solaris-ecf],) + AC_CHECK_HEADERS([sys/epoll.h],[poller=epoll],) + ] +) + +AM_CONDITIONAL([HAVE_ECF], [test x$poller = xsolaris-ecf]) +AM_CONDITIONAL([HAVE_EPOLL], [test x$poller = xepoll]) + +#Filter not implemented or invalid mechanisms +if test $poller = xno; then + AC_MSG_ERROR([Polling mechanism not implemented for $host]) +fi + # Files to generate AC_CONFIG_FILES([ qpidc.spec diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 592ccf8a76..445f5eae0d 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -70,7 +70,6 @@ sbin_PROGRAMS = qpidd qpidd_SOURCES = qpidd.cpp posix_plat_src = \ - qpid/sys/epoll/EpollPoller.cpp \ qpid/sys/posix/IOHandle.cpp \ qpid/sys/posix/Socket.cpp \ qpid/sys/posix/AsynchIO.cpp \ @@ -90,7 +89,15 @@ posix_plat_hdr = \ qpid/sys/posix/Fork.h \ qpid/sys/posix/LockFile.h -platform_src = $(posix_plat_src) +if HAVE_EPOLL + poller = qpid/sys/epoll/EpollPoller.cpp +endif + +if HAVE_ECF + poller = qpid/sys/solaris/ECFPoller.cpp +endif + +platform_src = $(posix_plat_src) $(poller) platform_hdr = $(posix_plat_hdr) lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la diff --git a/cpp/src/qpid/sys/solaris/ECFPoller.cpp b/cpp/src/qpid/sys/solaris/ECFPoller.cpp new file mode 100644 index 0000000000..fefa21dae1 --- /dev/null +++ b/cpp/src/qpid/sys/solaris/ECFPoller.cpp @@ -0,0 +1,301 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Logger.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/DeletionManager.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" + +#include <port.h> +#include <poll.h> +#include <errno.h> + +#include <assert.h> +#include <vector> +#include <exception> + + +//TODO: Remove this +#include "qpid/sys/Dispatcher.h" + +namespace qpid { +namespace sys { + +// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used +DeletionManager<PollerHandle> PollerHandleDeletionManager; + +// Instantiate (and define) class static for DeletionManager +template <> +DeletionManager<PollerHandle>::AllThreadsStatuses DeletionManager<PollerHandle>::allThreadsStatuses(0); + +class PollerHandlePrivate { + friend class Poller; + friend class PollerHandle; + + enum FDStat { + ABSENT, + MONITORED, + INACTIVE, + HUNGUP, + MONITORED_HUNGUP + }; + + int fd; + uint32_t events; + FDStat stat; + Mutex lock; + + PollerHandlePrivate(int f) : + fd(f), + events(0), + stat(ABSENT) { + } + + bool isActive() const { + return stat == MONITORED || stat == MONITORED_HUNGUP; + } + + void setActive() { + stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED; + } + + bool isInactive() const { + return stat == INACTIVE || stat == HUNGUP; + } + + void setInactive() { + stat = INACTIVE; + } + + bool isIdle() const { + return stat == ABSENT; + } + + void setIdle() { + stat = ABSENT; + } + + bool isHungup() const { + return stat == MONITORED_HUNGUP || stat == HUNGUP; + } + + void setHungup() { + assert(stat == MONITORED); + stat = HUNGUP; + } +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(toFd(h.impl))) +{} + +PollerHandle::~PollerHandle() { + delete impl; +} + +void PollerHandle::deferDelete() { + PollerHandleDeletionManager.markForDeletion(this); +} + +/** + * Concrete implementation of Poller to use the Solaris Event Completion + * Framework interface + */ +class PollerPrivate { + friend class Poller; + + const int portId; + + static uint32_t directionToPollEvent(Poller::Direction dir) { + switch (dir) { + case Poller::IN: return POLLIN; + case Poller::OUT: return POLLOUT; + case Poller::INOUT: return POLLIN | POLLOUT; + default: return 0; + } + } + + static Poller::EventType pollToDirection(uint32_t events) { + uint32_t e = events & (POLLIN | POLLOUT); + switch (e) { + case POLLIN: return Poller::READABLE; + case POLLOUT: return Poller::WRITABLE; + case POLLIN | POLLOUT: return Poller::READ_WRITABLE; + default: + return (events & (POLLHUP | POLLERR)) ? + Poller::DISCONNECTED : Poller::INVALID; + } + } + + PollerPrivate() : + portId(::port_create()) { + } + + ~PollerPrivate() { + } +}; + +void Poller::addFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + + uint32_t events = 0; + + if (eh.isIdle()) { + events = PollerPrivate::directionToPollEvent(dir); + } else { + assert(eh.isActive()); + events = eh.events | PollerPrivate::directionToPollEvent(dir); + } + + //port_associate can be used to add an association or modify an + //existing one + QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, events, &handle)); + eh.events = events; + eh.setActive(); + QPID_LOG(trace, "Poller::addFd(handle=" << &handle + << "[" << typeid(&handle).name() + << "], fd=" << eh.fd << ")"); + //assert(dynamic_cast<DispatchHandle*>(&handle)); +} + +void Poller::delFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd); + //Allow closing an invalid fd, allowing users to close fd before + //doing delFd() + if (rc == -1 && errno != EBADFD) { + QPID_POSIX_CHECK(rc); + } + eh.setIdle(); + QPID_LOG(trace, "Poller::delFd(handle=" << &handle + << ", fd=" << eh.fd << ")"); +} + +// modFd is equivalent to delFd followed by addFd +void Poller::modFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + eh.events = PollerPrivate::directionToPollEvent(dir); + + //If fd is already associated, events and user arguments are updated + //So, no need to check if fd is already associated + QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle)); + eh.setActive(); + QPID_LOG(trace, "Poller::modFd(handle=" << &handle + << ", fd=" << eh.fd << ")"); +} + +void Poller::rearmFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.isInactive()); + + QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle)); + eh.setActive(); + QPID_LOG(trace, "Poller::rearmdFd(handle=" << &handle + << ", fd=" << eh.fd << ")"); +} + +void Poller::shutdown() { + //Send an Alarm to the port + //We need to send a nonzero event mask, using POLLHUP, but + //The wait method will only look for a PORT_ALERT_SET + QPID_POSIX_CHECK(::port_alert(impl->portId, PORT_ALERT_SET, POLLHUP, NULL)); + QPID_LOG(trace, "Poller::shutdown"); +} + +Poller::Event Poller::wait(Duration timeout) { + timespec_t tout; + timespec_t* ptout = NULL; + port_event_t pe; + + if (timeout != TIME_INFINITE) { + tout.tv_sec = 0; + tout.tv_nsec = timeout; + ptout = &tout; + } + + do { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + QPID_LOG(trace, "About to enter port_get. Thread " + << pthread_self() + << ", timeout=" << timeout); + + int rc = ::port_get(impl->portId, &pe, ptout); + + if (rc < 0) { + switch (errno) { + case EINTR: + continue; + case ETIME: + return Event(0, TIMEOUT); + default: + QPID_POSIX_CHECK(rc); + } + } else { + //We use alert mode to notify the shutdown of the Poller + if (pe.portev_source == PORT_SOURCE_ALERT) { + return Event(0, SHUTDOWN); + } + if (pe.portev_source == PORT_SOURCE_FD) { + PollerHandle *handle = static_cast<PollerHandle*>(pe.portev_user); + PollerHandlePrivate& eh = *handle->impl; + ScopedLock<Mutex> l(eh.lock); + QPID_LOG(trace, "About to send handle: " << handle); + + if (eh.isActive()) { + if (pe.portev_events & POLLHUP) { + if (eh.isHungup()) { + return Event(handle, DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); + } + QPID_LOG(trace, "Sending event (thread: " + << pthread_self() << ") for handle " << handle + << ", direction= " + << PollerPrivate::pollToDirection(pe.portev_events)); + return Event(handle, PollerPrivate::pollToDirection(pe.portev_events)); + } + } + } + } while (true); +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} |