summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r--M4-RCs/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp371
1 files changed, 0 insertions, 371 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/M4-RCs/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
deleted file mode 100644
index a1e624ea75..0000000000
--- a/M4-RCs/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/IOHandle.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/DeletionManager.h"
-#include "qpid/sys/posix/check.h"
-#include "qpid/sys/posix/PrivatePosix.h"
-
-#include <sys/epoll.h>
-#include <errno.h>
-
-#include <assert.h>
-#include <vector>
-#include <exception>
-
-namespace qpid {
-namespace sys {
-
-// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used
-DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;
-
-// Instantiate (and define) class static for DeletionManager
-template <>
-DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);
-
-class PollerHandlePrivate {
- friend class Poller;
- friend class PollerHandle;
-
- enum FDStat {
- ABSENT,
- MONITORED,
- INACTIVE,
- HUNGUP,
- MONITORED_HUNGUP,
- DELETED
- };
-
- int fd;
- ::__uint32_t events;
- PollerHandle* pollerHandle;
- FDStat stat;
- Mutex lock;
-
- PollerHandlePrivate(int f, PollerHandle* p) :
- fd(f),
- events(0),
- pollerHandle(p),
- stat(ABSENT) {
- }
-
- bool isActive() const {
- return stat == MONITORED || stat == MONITORED_HUNGUP;
- }
-
- void setActive() {
- stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
- }
-
- bool isInactive() const {
- return stat == INACTIVE || stat == HUNGUP;
- }
-
- void setInactive() {
- stat = INACTIVE;
- }
-
- bool isIdle() const {
- return stat == ABSENT;
- }
-
- void setIdle() {
- stat = ABSENT;
- }
-
- bool isHungup() const {
- return stat == MONITORED_HUNGUP || stat == HUNGUP;
- }
-
- void setHungup() {
- assert(stat == MONITORED);
- stat = HUNGUP;
- }
-
- bool isDeleted() const {
- return stat == DELETED;
- }
-
- void setDeleted() {
- stat = DELETED;
- }
-};
-
-PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), this))
-{}
-
-PollerHandle::~PollerHandle() {
- {
- ScopedLock<Mutex> l(impl->lock);
- if (impl->isDeleted()) {
- return;
- }
- if (impl->isActive()) {
- impl->setDeleted();
- }
- }
- PollerHandleDeletionManager.markForDeletion(impl);
-}
-
-/**
- * Concrete implementation of Poller to use the Linux specific epoll
- * interface
- */
-class PollerPrivate {
- friend class Poller;
-
- static const int DefaultFds = 256;
-
- struct ReadablePipe {
- int fds[2];
-
- /**
- * This encapsulates an always readable pipe which we can add
- * to the epoll set to force epoll_wait to return
- */
- ReadablePipe() {
- QPID_POSIX_CHECK(::pipe(fds));
- // Just write the pipe's fds to the pipe
- QPID_POSIX_CHECK(::write(fds[1], fds, 2));
- }
-
- ~ReadablePipe() {
- ::close(fds[0]);
- ::close(fds[1]);
- }
-
- int getFD() {
- return fds[0];
- }
- };
-
- static ReadablePipe alwaysReadable;
-
- const int epollFd;
- bool isShutdown;
-
- static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
- switch (dir) {
- case Poller::INPUT: return ::EPOLLIN;
- case Poller::OUTPUT: return ::EPOLLOUT;
- case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
- default: return 0;
- }
- }
-
- static Poller::EventType epollToDirection(::__uint32_t events) {
- // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs
- // can give you both!
- events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events;
- ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
- switch (e) {
- case ::EPOLLIN: return Poller::READABLE;
- case ::EPOLLOUT: return Poller::WRITABLE;
- case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE;
- default:
- return (events & (::EPOLLHUP | ::EPOLLERR)) ?
- Poller::DISCONNECTED : Poller::INVALID;
- }
- }
-
- PollerPrivate() :
- epollFd(::epoll_create(DefaultFds)),
- isShutdown(false) {
- QPID_POSIX_CHECK(epollFd);
- }
-
- ~PollerPrivate() {
- // It's probably okay to ignore any errors here as there can't be data loss
- ::close(epollFd);
- }
-};
-
-PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
-
-void Poller::addFd(PollerHandle& handle, Direction dir) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- ::epoll_event epe;
- int op;
-
- if (eh.isIdle()) {
- op = EPOLL_CTL_ADD;
- epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
- } else {
- assert(eh.isActive());
- op = EPOLL_CTL_MOD;
- epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
- }
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
-
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
-
- // Record monitoring state of this fd
- eh.events = epe.events;
- eh.setActive();
-}
-
-void Poller::delFd(PollerHandle& handle) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
- // Ignore EBADF since deleting a nonexistent fd has the overall required result!
- // And allows the case where a sloppy program closes the fd and then does the delFd()
- if (rc == -1 && errno != EBADF) {
- QPID_POSIX_CHECK(rc);
- }
- eh.setIdle();
-}
-
-// modFd is equivalent to delFd followed by addFd
-void Poller::modFd(PollerHandle& handle, Direction dir) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- assert(!eh.isIdle());
-
- ::epoll_event epe;
- epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
-
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
- // Record monitoring state of this fd
- eh.events = epe.events;
- eh.setActive();
-}
-
-void Poller::rearmFd(PollerHandle& handle) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- assert(eh.isInactive());
-
- ::epoll_event epe;
- epe.events = eh.events;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
-
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
- eh.setActive();
-}
-
-void Poller::shutdown() {
- // NB: this function must be async-signal safe, it must not
- // call any function that is not async-signal safe.
-
- // Allow sloppy code to shut us down more than once
- if (impl->isShutdown)
- return;
-
- // Don't use any locking here - isshutdown will be visible to all
- // after the epoll_ctl() anyway (it's a memory barrier)
- impl->isShutdown = true;
-
- // Add always readable fd to epoll (not EPOLLONESHOT)
- int fd = impl->alwaysReadable.getFD();
- ::epoll_event epe;
- epe.events = ::EPOLLIN;
- epe.data.u64 = 0; // Keep valgrind happy - don't strictly need next line now
- epe.data.ptr = 0;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe));
-}
-
-Poller::Event Poller::wait(Duration timeout) {
- epoll_event epe;
- int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
-
- // Repeat until we weren't interupted
- do {
- PollerHandleDeletionManager.markAllUnusedInThisThread();
- int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
-
- if (impl->isShutdown) {
- PollerHandleDeletionManager.markAllUnusedInThisThread();
- return Event(0, SHUTDOWN);
- }
-
- if (rc ==-1 && errno != EINTR) {
- QPID_POSIX_CHECK(rc);
- } else if (rc > 0) {
- assert(rc == 1);
- PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(epe.data.ptr);
-
- ScopedLock<Mutex> l(eh.lock);
-
- // the handle could have gone inactive since we left the epoll_wait
- if (eh.isActive()) {
- PollerHandle* handle = eh.pollerHandle;
-
- // If the connection has been hungup we could still be readable
- // (just not writable), allow us to readable until we get here again
- if (epe.events & ::EPOLLHUP) {
- if (eh.isHungup()) {
- return Event(handle, DISCONNECTED);
- }
- eh.setHungup();
- } else {
- eh.setInactive();
- }
- return Event(handle, PollerPrivate::epollToDirection(epe.events));
- } else if (eh.isDeleted()) {
- // The handle has been deleted whilst still active and so must be removed
- // from the poller
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
- // Ignore EBADF since it's quite likely that we could race with closing the fd
- if (rc == -1 && errno != EBADF) {
- QPID_POSIX_CHECK(rc);
- }
- }
- }
- // We only get here if one of the following:
- // * epoll_wait was interrupted by a signal
- // * epoll_wait timed out
- // * the state of the handle changed after being returned by epoll_wait
- //
- // The only things we can do here are return a timeout or wait more.
- // Obviously if we timed out we return timeout; if the wait was meant to
- // be indefinite then we should never return with a time out so we go again.
- // If the wait wasn't indefinite, but we were interrupted then we have to return
- // with a timeout as we don't know how long we've waited so far and so we can't
- // continue the wait.
- if (rc == 0 || timeoutMs != -1) {
- PollerHandleDeletionManager.markAllUnusedInThisThread();
- return Event(0, TIMEOUT);
- }
- } while (true);
-}
-
-// Concrete constructors
-Poller::Poller() :
- impl(new PollerPrivate())
-{}
-
-Poller::~Poller() {
- delete impl;
-}
-
-}}