summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/epoll/EpollPoller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp178
1 files changed, 129 insertions, 49 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 42b5d8b1aa..d0623b86f4 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -46,6 +46,7 @@ DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerH
class PollerHandlePrivate {
friend class Poller;
+ friend class PollerPrivate;
friend class PollerHandle;
enum FDStat {
@@ -55,6 +56,7 @@ class PollerHandlePrivate {
HUNGUP,
MONITORED_HUNGUP,
INTERRUPTED,
+ INTERRUPTED_HUNGUP,
DELETED
};
@@ -76,7 +78,9 @@ class PollerHandlePrivate {
}
void setActive() {
- stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
+ stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP)
+ ? MONITORED_HUNGUP
+ : MONITORED;
}
bool isInactive() const {
@@ -96,7 +100,10 @@ class PollerHandlePrivate {
}
bool isHungup() const {
- return stat == MONITORED_HUNGUP || stat == HUNGUP;
+ return
+ stat == MONITORED_HUNGUP ||
+ stat == HUNGUP ||
+ stat == INTERRUPTED_HUNGUP;
}
void setHungup() {
@@ -105,11 +112,13 @@ class PollerHandlePrivate {
}
bool isInterrupted() const {
- return stat == INTERRUPTED;
+ return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
}
void setInterrupted() {
- stat = INTERRUPTED;
+ stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
+ ? INTERRUPTED_HUNGUP
+ : INTERRUPTED;
}
bool isDeleted() const {
@@ -131,13 +140,13 @@ PollerHandle::~PollerHandle() {
if (impl->isDeleted()) {
return;
}
+ impl->pollerHandle = 0;
if (impl->isInterrupted()) {
impl->setDeleted();
return;
}
- if (impl->isActive()) {
- impl->setDeleted();
- }
+ assert(impl->isIdle());
+ impl->setDeleted();
}
PollerHandleDeletionManager.markForDeletion(impl);
}
@@ -256,8 +265,13 @@ class PollerPrivate {
~PollerPrivate() {
// It's probably okay to ignore any errors here as there can't be data loss
::close(epollFd);
+
+ // Need to put the interruptHandle in idle state to delete it
+ static_cast<PollerHandle&>(interruptHandle).impl->setIdle();
}
-
+
+ void resetMode(PollerHandlePrivate& handle);
+
void interrupt() {
::epoll_event epe;
// Use EPOLLONESHOT so we only wake a single thread
@@ -279,74 +293,122 @@ class PollerPrivate {
PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
-void Poller::addFd(PollerHandle& handle, Direction dir) {
+void Poller::registerHandle(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- ::epoll_event epe;
- int op;
+ assert(eh.isIdle());
- if (eh.isIdle()) {
- op = EPOLL_CTL_ADD;
- epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
- } else {
- assert(eh.isActive());
- op = EPOLL_CTL_MOD;
- epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
- }
+ ::epoll_event epe;
+ epe.events = ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
- // Record monitoring state of this fd
- eh.events = epe.events;
eh.setActive();
}
-void Poller::delFd(PollerHandle& handle) {
+void Poller::unregisterHandle(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
+
int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
QPID_POSIX_CHECK(rc);
}
+
eh.setIdle();
}
-// modFd is equivalent to delFd followed by addFd
-void Poller::modFd(PollerHandle& handle, Direction dir) {
+void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
+ PollerHandle* ph;
+ {
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isActive());
+
+ if (eh.isIdle() || eh.isDeleted()) {
+ return;
+ }
+
+ if (eh.events==0) {
+ eh.setActive();
+ return;
+ }
+
+ if (!eh.isInterrupted()) {
+ ::epoll_event epe;
+ epe.events = eh.events | ::EPOLLONESHOT;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &eh;
+
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+
+ eh.setActive();
+ return;
+ }
+ ph = eh.pollerHandle;
+ }
+
+ PollerHandlePrivate& ihp = *static_cast<PollerHandle&>(interruptHandle).impl;
+ ScopedLock<Mutex> l(ihp.lock);
+ interruptHandle.addHandle(*ph);
+ ihp.setActive();
+ interrupt();
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
+ ::__uint32_t oldEvents = eh.events;
+ eh.events |= PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
::epoll_event epe;
- epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+ epe.events = eh.events | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
- // Record monitoring state of this fd
- eh.events = epe.events;
- eh.setActive();
}
-void Poller::rearmFd(PollerHandle& handle) {
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.isInactive());
+ assert(!eh.isIdle());
+ ::__uint32_t oldEvents = eh.events;
+ eh.events &= ~PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
::epoll_event epe;
- epe.events = eh.events;
+ epe.events = eh.events | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
- eh.setActive();
}
void Poller::shutdown() {
@@ -368,14 +430,25 @@ bool Poller::interrupt(PollerHandle& handle) {
{
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- if (!eh.isActive()) {
+ if (eh.isIdle() || eh.isDeleted()) {
return false;
}
+
+ if (eh.isInterrupted()) {
+ return true;
+ }
+
+ // Stop monitoring handle for read or write
::epoll_event epe;
epe.events = 0;
epe.data.u64 = 0; // Keep valgrind happy
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
- eh.setInterrupted();
+
+ if (eh.isInactive()) {
+ eh.setInterrupted();
+ return true;
+ }
+ eh.setInterrupted();
}
PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
@@ -414,6 +487,7 @@ void Poller::run() {
}
Poller::Event Poller::wait(Duration timeout) {
+ static __thread PollerHandlePrivate* lastReturnedHandle = 0;
epoll_event epe;
int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
AbsTime targetTimeout =
@@ -421,6 +495,11 @@ Poller::Event Poller::wait(Duration timeout) {
FAR_FUTURE :
AbsTime(now(), timeout);
+ if (lastReturnedHandle) {
+ impl->resetMode(*lastReturnedHandle);
+ lastReturnedHandle = 0;
+ }
+
// Repeat until we weren't interrupted by signal
do {
PollerHandleDeletionManager.markAllUnusedInThisThread();
@@ -460,12 +539,19 @@ Poller::Event Poller::wait(Duration timeout) {
}
}
if (wrappedHandle) {
- ScopedLock<Mutex> l(wrappedHandle->impl->lock);
- if (!wrappedHandle->impl->isDeleted()) {
- wrappedHandle->impl->setInactive();
+ PollerHandlePrivate& eh = *wrappedHandle->impl;
+ {
+ ScopedLock<Mutex> l(eh.lock);
+ if (!eh.isDeleted()) {
+ if (!eh.isIdle()) {
+ eh.setInactive();
+ }
+ lastReturnedHandle = &eh;
+ assert(eh.pollerHandle == wrappedHandle);
return Event(wrappedHandle, INTERRUPTED);
}
- PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl);
+ }
+ PollerHandleDeletionManager.markForDeletion(&eh);
}
continue;
}
@@ -482,6 +568,7 @@ Poller::Event Poller::wait(Duration timeout) {
// the handle could have gone inactive since we left the epoll_wait
if (eh.isActive()) {
PollerHandle* handle = eh.pollerHandle;
+ assert(handle);
// If the connection has been hungup we could still be readable
// (just not writable), allow us to readable until we get here again
@@ -493,15 +580,8 @@ Poller::Event Poller::wait(Duration timeout) {
} else {
eh.setInactive();
}
+ lastReturnedHandle = &eh;
return Event(handle, PollerPrivate::epollToDirection(epe.events));
- } else if (eh.isDeleted()) {
- // The handle has been deleted whilst still active and so must be removed
- // from the poller
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
- // Ignore EBADF since it's quite likely that we could race with closing the fd
- if (rc == -1 && errno != EBADF) {
- QPID_POSIX_CHECK(rc);
- }
}
}
// We only get here if one of the following: