summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClifford Allan Jansen <cliffjansen@apache.org>2012-02-09 16:31:45 +0000
committerClifford Allan Jansen <cliffjansen@apache.org>2012-02-09 16:31:45 +0000
commitb76d90af9f4f80f90f03d26b592b3053f7de77ee (patch)
tree4a5b0beb6989b30daa9086ac70cf7dd305cc4790
parent6ac39fefbb2d113c0f4413cd04d2019d92125292 (diff)
downloadqpid-python-b76d90af9f4f80f90f03d26b592b3053f7de77ee.tar.gz
QPID-3571 PosixPoller implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1242380 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/sys/posix/PosixPoller.cpp793
1 files changed, 793 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/posix/PosixPoller.cpp b/cpp/src/qpid/sys/posix/PosixPoller.cpp
new file mode 100644
index 0000000000..eb0c3384d1
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/PosixPoller.cpp
@@ -0,0 +1,793 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicCount.h"
+#include "qpid/sys/DeletionManager.h"
+#include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Condition.h"
+
+#include <poll.h>
+#include <errno.h>
+#include <signal.h>
+
+#include <assert.h>
+#include <queue>
+#include <set>
+#include <exception>
+
+/*
+ *
+ * This is a qpid::sys::Poller implementation for Posix systems.
+ *
+ * This module follows the structure of the Linux EpollPoller as closely as possible
+ * to simplify maintainability. Noteworthy differences:
+ *
+ * The Linux epoll_xxx() calls present one event at a time to multiple callers whereas poll()
+ * returns one or more events to a single caller. The EventStream class layers a
+ * "one event per call" view of the poll() result to multiple threads.
+ *
+ * The HandleSet is the master set of in-use PollerHandles. The EventStream
+ * maintains a snapshot copy taken just before the call to poll() that remains static
+ * until all flagged events have been processed.
+ *
+ * There is an additional window where the PollerHandlePrivate class may survive the
+ * parent PollerHandle destructor, i.e. between snapshots.
+ *
+ * Safe interrupting of the Poller is implemented using the "self-pipe trick".
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used
+DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;
+
+// Instantiate (and define) class static for DeletionManager
+template <>
+DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);
+
+class PollerHandlePrivate {
+ friend class Poller;
+ friend class PollerPrivate;
+ friend class PollerHandle;
+ friend class HandleSet;
+
+ enum FDStat {
+ ABSENT,
+ MONITORED,
+ INACTIVE,
+ HUNGUP,
+ MONITORED_HUNGUP,
+ INTERRUPTED,
+ INTERRUPTED_HUNGUP,
+ DELETED
+ };
+
+ short events;
+ const IOHandlePrivate* ioHandle;
+ PollerHandle* pollerHandle;
+ FDStat stat;
+ Mutex lock;
+
+ PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
+ events(0),
+ ioHandle(h),
+ pollerHandle(p),
+ stat(ABSENT) {
+ }
+
+ int fd() const {
+ return toFd(ioHandle);
+ }
+
+ bool isActive() const {
+ return stat == MONITORED || stat == MONITORED_HUNGUP;
+ }
+
+ void setActive() {
+ stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP)
+ ? MONITORED_HUNGUP
+ : MONITORED;
+ }
+
+ bool isInactive() const {
+ return stat == INACTIVE || stat == HUNGUP;
+ }
+
+ void setInactive() {
+ stat = INACTIVE;
+ }
+
+ bool isIdle() const {
+ return stat == ABSENT;
+ }
+
+ void setIdle() {
+ stat = ABSENT;
+ }
+
+ bool isHungup() const {
+ return
+ stat == MONITORED_HUNGUP ||
+ stat == HUNGUP ||
+ stat == INTERRUPTED_HUNGUP;
+ }
+
+ void setHungup() {
+ assert(stat == MONITORED);
+ stat = HUNGUP;
+ }
+
+ bool isInterrupted() const {
+ return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
+ }
+
+ void setInterrupted() {
+ stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
+ ? INTERRUPTED_HUNGUP
+ : INTERRUPTED;
+ }
+
+ bool isDeleted() const {
+ return stat == DELETED;
+ }
+
+ void setDeleted() {
+ stat = DELETED;
+ }
+};
+
+PollerHandle::PollerHandle(const IOHandle& h) :
+ impl(new PollerHandlePrivate(h.impl, this))
+{}
+
+PollerHandle::~PollerHandle() {
+ {
+ ScopedLock<Mutex> l(impl->lock);
+ if (impl->isDeleted()) {
+ return;
+ }
+ impl->pollerHandle = 0;
+ if (impl->isInterrupted()) {
+ impl->setDeleted();
+ return;
+ }
+ assert(impl->isIdle());
+ impl->setDeleted();
+ }
+ PollerHandleDeletionManager.markForDeletion(impl);
+}
+
+class HandleSet
+{
+ Mutex lock;
+ bool stale;
+ std::set<PollerHandlePrivate*> handles;
+ public:
+ HandleSet() : stale(true) {}
+ void add(PollerHandlePrivate*);
+ void remove(PollerHandlePrivate*);
+ void cleanup();
+ bool snapshot(std::vector<PollerHandlePrivate *>& , std::vector<struct ::pollfd>&);
+ void setStale();
+};
+
+void HandleSet::add(PollerHandlePrivate* h)
+{
+ ScopedLock<Mutex> l(lock);
+ handles.insert(h);
+}
+void HandleSet::remove(PollerHandlePrivate* h)
+{
+ ScopedLock<Mutex> l(lock);
+ handles.erase(h);
+}
+void HandleSet::cleanup()
+{
+ // Inform all registered handles of disconnection
+ std::set<PollerHandlePrivate*> copy;
+ handles.swap(copy);
+ for (std::set<PollerHandlePrivate*>::const_iterator i = copy.begin(); i != copy.end(); ++i) {
+ PollerHandlePrivate& eh = **i;
+ {
+ ScopedLock<Mutex> l(eh.lock);
+ if (!eh.isDeleted()) {
+ Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED);
+ event.process();
+ }
+ }
+ }
+}
+void HandleSet::setStale()
+{
+ // invalidate cached pollfds for next snapshot
+ ScopedLock<Mutex> l(lock);
+ stale = true;
+}
+
+/**
+ * Concrete implementation of Poller to use Posix poll()
+ * interface
+ */
+class PollerPrivate {
+ friend class Poller;
+ friend class EventStream;
+ friend class HandleSet;
+
+ class SignalPipe {
+ /**
+ * Used to wakeup a thread in ::poll()
+ */
+ int fds[2];
+ bool signaled;
+ bool permanent;
+ Mutex lock;
+ public:
+ SignalPipe() : signaled(false), permanent(false) {
+ QPID_POSIX_CHECK(::pipe(fds));
+ }
+
+ ~SignalPipe() {
+ ::close(fds[0]);
+ ::close(fds[1]);
+ }
+
+ int getFD() {
+ return fds[0];
+ }
+
+ bool isSet() {
+ return signaled;
+ }
+
+ void set() {
+ ScopedLock<Mutex> l(lock);
+ if (signaled)
+ return;
+ signaled = true;
+ QPID_POSIX_CHECK(::write(fds[1], " ", 1));
+ }
+
+ void reset() {
+ if (permanent)
+ return;
+ ScopedLock<Mutex> l(lock);
+ if (signaled) {
+ char ignore;
+ QPID_POSIX_CHECK(::read(fds[0], &ignore, 1));
+ signaled = false;
+ }
+ }
+
+ void setPermanently() {
+ // async signal safe calls only. No locking.
+ permanent = true;
+ signaled = true;
+ QPID_POSIX_CHECK(::write(fds[1], " ", 2));
+ // poll() should never block now
+ }
+ };
+
+ // Collect pending events and serialize access. Maintain array of pollfd structs.
+ class EventStream {
+ typedef Poller::Event Event;
+ PollerPrivate& pollerPrivate;
+ SignalPipe& signalPipe;
+ std::queue<PollerHandlePrivate*> interruptedHandles;
+ std::vector<struct ::pollfd> pollfds;
+ std::vector<PollerHandlePrivate*> pollHandles;
+ Mutex streamLock;
+ Mutex serializeLock;
+ Condition serializer;
+ bool busy;
+ int currentPollfd;
+ int pollCount;
+ int waiters;
+
+ public:
+
+ EventStream(PollerPrivate* p) : pollerPrivate(*p), signalPipe(p->signalPipe), busy(false),
+ currentPollfd(0), pollCount(0), waiters(0) {
+ // The signal pipe is the first element of pollfds and pollHandles
+ pollfds.reserve(8);
+ pollfds.resize(1);
+ pollfds[0].fd = pollerPrivate.signalPipe.getFD();
+ pollfds[0].events = POLLIN;
+ pollfds[0].revents = 0;
+
+ pollHandles.reserve(8);
+ pollHandles.resize(1);
+ pollHandles[0] = 0;
+ }
+
+ void addInterrupt(PollerHandle& handle) {
+ ScopedLock<Mutex> l(streamLock);
+ interruptedHandles.push(handle.impl);
+ }
+
+ // Serialize access to the stream.
+ Event next(Duration timeout) {
+ AbsTime targetTimeout =
+ (timeout == TIME_INFINITE) ?
+ FAR_FUTURE :
+ AbsTime(now(), timeout);
+
+
+ ScopedLock<Mutex> l(serializeLock);
+ Event event(0, Poller::INVALID);
+ while (busy) {
+ waiters++;
+ bool timedout = !serializer.wait(serializeLock, targetTimeout);
+ waiters--;
+
+ if (busy && timedout) {
+ return Event(0, Poller::TIMEOUT);
+ }
+ }
+ busy = true;
+ {
+ ScopedUnlock<Mutex> ul(serializeLock);
+ event = getEvent(targetTimeout);
+ }
+ busy = false;
+
+ if (waiters > 0)
+ serializer.notify();
+ return event;
+ }
+
+ Event getEvent(AbsTime targetTimeout) {
+ bool timeoutPending = false;
+
+ ScopedLock<Mutex> l(streamLock); // hold lock except for poll()
+
+ // loop until poll event, async interrupt, or timeout
+ while (true) {
+
+ // first check for any interrupts
+ while (interruptedHandles.size() > 0) {
+ PollerHandlePrivate& eh = *interruptedHandles.front();
+ interruptedHandles.pop();
+ {
+ ScopedLock<Mutex> lk(eh.lock);
+ if (!eh.isDeleted()) {
+ if (!eh.isIdle()) {
+ eh.setInactive();
+ }
+
+ // nullify the corresponding pollfd event, if any
+ int ehfd = eh.fd();
+ std::vector<struct ::pollfd>::iterator i = pollfds.begin() + 1; // skip self pipe at front
+ for (; i != pollfds.end(); i++) {
+ if (i->fd == ehfd) {
+ i->events = 0;
+ if (i->revents) {
+ i->revents = 0;
+ pollCount--;
+ }
+ break;
+ }
+ }
+ return Event(eh.pollerHandle, Poller::INTERRUPTED);
+ }
+ }
+ PollerHandleDeletionManager.markForDeletion(&eh);
+ }
+
+ // Check for shutdown
+ if (pollerPrivate.isShutdown) {
+ PollerHandleDeletionManager.markAllUnusedInThisThread();
+ return Event(0, Poller::SHUTDOWN);
+ }
+
+ // search for any remaining events from earlier poll()
+ int nfds = pollfds.size();
+ while ((pollCount > 0) && (currentPollfd < nfds)) {
+ int index = currentPollfd++;
+ short evt = pollfds[index].revents;
+ if (evt != 0) {
+ pollCount--;
+ PollerHandlePrivate& eh = *pollHandles[index];
+ ScopedLock<Mutex> l(eh.lock);
+ // stop polling this handle until resetMode()
+ pollfds[index].events = 0;
+
+ // the handle could have gone inactive since snapshot taken
+ if (eh.isActive()) {
+ PollerHandle* handle = eh.pollerHandle;
+ assert(handle);
+
+ // If the connection has been hungup we could still be readable
+ // (just not writable), allow us to readable until we get here again
+ if (evt & POLLHUP) {
+ if (eh.isHungup()) {
+ eh.setInactive();
+ // Don't set up last Handle so that we don't reset this handle
+ // on re-entering Poller::wait. This means that we will never
+ // be set active again once we've returned disconnected, and so
+ // can never be returned again.
+ return Event(handle, Poller::DISCONNECTED);
+ }
+ eh.setHungup();
+ } else {
+ eh.setInactive();
+ }
+ return Event(handle, PollerPrivate::epollToDirection(evt));
+ }
+ }
+ }
+
+ if (timeoutPending) {
+ return Event(0, Poller::TIMEOUT);
+ }
+
+ // no outstanding events, poll() for more
+ {
+ ScopedUnlock<Mutex> ul(streamLock);
+
+ bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds);
+ if (refreshed) {
+ // we just drained all interruptedHandles and got a fresh snapshot
+ PollerHandleDeletionManager.markAllUnusedInThisThread();
+ }
+
+ if (!signalPipe.isSet()) {
+ int timeoutMs = -1;
+ if (!(targetTimeout == FAR_FUTURE)) {
+ timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC;
+ if (timeoutMs < 0)
+ timeoutMs = 0;
+ }
+
+ pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs);
+
+ if (pollCount ==-1 && errno != EINTR) {
+ QPID_POSIX_CHECK(pollCount);
+ }
+ else if (pollCount == 0) {
+ // timeout, unless shutdown or interrupt arrives in another thread
+ timeoutPending = true;
+ }
+ else {
+ if (pollfds[0].revents) {
+ pollCount--; // signal pipe doesn't count
+ }
+ }
+ }
+ else
+ pollCount = 0;
+ signalPipe.reset();
+ }
+ currentPollfd = 1;
+ }
+ }
+ };
+
+ bool isShutdown;
+ HandleSet registeredHandles;
+ AtomicCount threadCount;
+ SignalPipe signalPipe;
+ EventStream eventStream;
+
+ static short directionToEpollEvent(Poller::Direction dir) {
+ switch (dir) {
+ case Poller::INPUT: return POLLIN;
+ case Poller::OUTPUT: return POLLOUT;
+ case Poller::INOUT: return POLLIN | POLLOUT;
+ default: return 0;
+ }
+ }
+
+ static Poller::EventType epollToDirection(short events) {
+ // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs
+ // can give you both!
+ events = (events & POLLHUP) ? events & ~POLLOUT : events;
+ short e = events & (POLLIN | POLLOUT);
+ switch (e) {
+ case POLLIN: return Poller::READABLE;
+ case POLLOUT: return Poller::WRITABLE;
+ case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
+ default:
+ return (events & (POLLHUP | POLLERR)) ?
+ Poller::DISCONNECTED : Poller::INVALID;
+ }
+ }
+
+ PollerPrivate() :
+ isShutdown(false), eventStream(this) {
+ }
+
+ ~PollerPrivate() {}
+
+ void resetMode(PollerHandlePrivate& handle);
+
+ void interrupt() {
+ signalPipe.set();
+ }
+
+ void interruptAll() {
+ // be async signal safe
+ signalPipe.setPermanently();
+ }
+};
+
+
+void Poller::registerHandle(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.isIdle());
+
+ eh.setActive();
+ impl->registeredHandles.add(handle.impl);
+ // not stale until monitored
+}
+
+void Poller::unregisterHandle(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isIdle());
+
+ eh.setIdle();
+ impl->registeredHandles.remove(handle.impl);
+ impl->registeredHandles.setStale();
+ impl->interrupt();
+}
+
+void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
+ PollerHandle* ph;
+ {
+ // Called after an event has been processed for a handle
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isActive());
+
+ if (eh.isIdle() || eh.isDeleted()) {
+ return;
+ }
+
+ if (eh.events==0) {
+ eh.setActive();
+ return;
+ }
+
+ if (!eh.isInterrupted()) {
+ // Handle still in use, allow events to resume.
+ eh.setActive();
+ registeredHandles.setStale();
+ // Ouch. This scales poorly for large handle sets.
+ // TODO: avoid new snapshot, perhaps create an index to pollfds or a
+ // pending reset queue to be processed before each poll(). However, the real
+ // scalable solution is to implement the OS-specific epoll equivalent.
+ interrupt();
+ return;
+ }
+ ph = eh.pollerHandle;
+ }
+
+ eventStream.addInterrupt(*ph);
+ interrupt();
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isIdle());
+
+ short oldEvents = eh.events;
+ eh.events |= PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
+ // tell polling thread to update its pollfds
+ impl->registeredHandles.setStale();
+ impl->interrupt();
+}
+
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isIdle());
+
+ short oldEvents = eh.events;
+ eh.events &= ~PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
+ impl->registeredHandles.setStale();
+ impl->interrupt();
+}
+
+void Poller::shutdown() {
+ // NB: this function must be async-signal safe, it must not
+ // call any function that is not async-signal safe.
+
+ // Allow sloppy code to shut us down more than once
+ if (impl->isShutdown)
+ return;
+
+ // Don't use any locking here - isShutdown will be visible to all
+ // after the write() anyway (it's a memory barrier)
+ impl->isShutdown = true;
+
+ impl->interruptAll();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+ {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ if (eh.isIdle() || eh.isDeleted()) {
+ return false;
+ }
+
+ if (eh.isInterrupted()) {
+ return true;
+ }
+
+ if (eh.isInactive()) {
+ eh.setInterrupted();
+ return true;
+ }
+ eh.setInterrupted();
+ eh.events = 0;
+ }
+
+ impl->registeredHandles.setStale();
+ impl->eventStream.addInterrupt(handle);
+ impl->interrupt();
+ return true;
+}
+
+void Poller::run() {
+ // Ensure that we exit thread responsibly under all circumstances
+ try {
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
+ ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+ ++(impl->threadCount);
+ do {
+ Event event = wait();
+
+ // If can read/write then dispatch appropriate callbacks
+ if (event.handle) {
+ event.process();
+ } else {
+ // Handle shutdown
+ switch (event.type) {
+ case SHUTDOWN:
+ //last thread to respond to shutdown cleans up:
+ if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup();
+ PollerHandleDeletionManager.destroyThreadState();
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ }
+ } while (true);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what());
+ }
+ PollerHandleDeletionManager.destroyThreadState();
+ --(impl->threadCount);
+}
+
+bool Poller::hasShutdown()
+{
+ return impl->isShutdown;
+}
+
+Poller::Event Poller::wait(Duration timeout) {
+ static __thread PollerHandlePrivate* lastReturnedHandle = 0;
+
+ if (lastReturnedHandle) {
+ impl->resetMode(*lastReturnedHandle);
+ lastReturnedHandle = 0;
+ }
+
+ Event event = impl->eventStream.next(timeout);
+
+ switch (event.type) {
+ case INTERRUPTED:
+ case READABLE:
+ case WRITABLE:
+ case READ_WRITABLE:
+ lastReturnedHandle = event.handle->impl;
+ break;
+ default:
+ ;
+ }
+
+ return event;
+}
+
+// Concrete constructors
+Poller::Poller() :
+ impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+ delete impl;
+}
+
+
+bool HandleSet::snapshot(std::vector<PollerHandlePrivate *>& hs , std::vector<struct ::pollfd>& fds)
+{
+ // Element 0 of the vectors is always the signal pipe, leave undisturbed
+ {
+ ScopedLock<Mutex> l(lock);
+ if (!stale)
+ return false; // no refresh done
+
+ hs.resize(1);
+ for (std::set<PollerHandlePrivate*>::const_iterator i = handles.begin(); i != handles.end(); ++i) {
+ hs.push_back(*i);
+ }
+ stale = false;
+ // have copy of handle set (in vector form), drop the lock and build the pollfds
+ }
+
+ // sync pollfds to same sizing as the handles
+ int sz = hs.size();
+ fds.resize(sz);
+
+ for (int j = 1; j < sz; ++j) {
+ // create a pollfd entry for each handle
+ struct ::pollfd& pollfd = fds[j];
+ PollerHandlePrivate& eh = *hs[j];
+ ScopedLock<Mutex> lk(eh.lock);
+
+ if (!eh.isInactive() && !eh.isDeleted()) {
+ pollfd.fd = eh.fd();
+ pollfd.events = eh.events;
+ } else {
+ pollfd.fd = -1; // tell poll() to ignore this fd
+ pollfd.events = 0;
+ }
+ }
+ return true;
+}
+
+
+}}