summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-06-01 18:59:52 +0000
committerGordon Sim <gsim@apache.org>2010-06-01 18:59:52 +0000
commit292347fdbd7ef8e204ff3486b21497f33bff50fd (patch)
treedde2fd77c7d08fc98600b0bc146d1f74febc4fcc
parentfff0a43d69db6e62f1a21d52f752bf59c35fbd2a (diff)
downloadqpid-python-292347fdbd7ef8e204ff3486b21497f33bff50fd.tar.gz
QPID-2004: Send disconnected event to any handles still registered after shutdown to ensure they can clean themselves up
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@950205 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/PollerDispatch.cpp6
-rw-r--r--qpid/cpp/src/qpid/sys/Poller.h2
-rw-r--r--qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp46
-rw-r--r--qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp5
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IocpPoller.cpp5
5 files changed, 62 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp b/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
index a839ef863b..b8d94b95a5 100644
--- a/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
+++ b/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
@@ -60,8 +60,10 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) {
// Entry point: called if disconnected from CPG.
void PollerDispatch::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, "Disconnected from cluster");
- onError();
+ if (!poller->hasShutdown()) {
+ QPID_LOG(critical, "Disconnected from cluster");
+ onError();
+ }
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/sys/Poller.h b/qpid/cpp/src/qpid/sys/Poller.h
index 413d4242b8..47b7606a16 100644
--- a/qpid/cpp/src/qpid/sys/Poller.h
+++ b/qpid/cpp/src/qpid/sys/Poller.h
@@ -99,6 +99,8 @@ public:
QPID_COMMON_EXTERN void monitorHandle(PollerHandle& handle, Direction dir);
QPID_COMMON_EXTERN void unmonitorHandle(PollerHandle& handle, Direction dir);
QPID_COMMON_EXTERN Event wait(Duration timeout = TIME_INFINITE);
+
+ QPID_COMMON_EXTERN bool hasShutdown();
};
/**
diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 7b0d0aaa7f..9ae9bcefb5 100644
--- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -22,6 +22,7 @@
#include "qpid/sys/Poller.h"
#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicCount.h"
#include "qpid/sys/DeletionManager.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
@@ -33,6 +34,7 @@
#include <assert.h>
#include <queue>
+#include <set>
#include <exception>
namespace qpid {
@@ -156,6 +158,37 @@ PollerHandle::~PollerHandle() {
PollerHandleDeletionManager.markForDeletion(impl);
}
+class HandleSet
+{
+ Mutex lock;
+ std::set<PollerHandle*> handles;
+ public:
+ void add(PollerHandle*);
+ void remove(PollerHandle*);
+ void cleanup();
+};
+
+void HandleSet::add(PollerHandle* h)
+{
+ ScopedLock<Mutex> l(lock);
+ handles.insert(h);
+}
+void HandleSet::remove(PollerHandle* h)
+{
+ ScopedLock<Mutex> l(lock);
+ handles.erase(h);
+}
+void HandleSet::cleanup()
+{
+ // Inform all registered handles of disconnection
+ std::set<PollerHandle*> copy;
+ handles.swap(copy);
+ for (std::set<PollerHandle*>::const_iterator i = copy.begin(); i != copy.end(); ++i) {
+ Poller::Event event(*i, Poller::DISCONNECTED);
+ event.process();
+ }
+}
+
/**
* Concrete implementation of Poller to use the Linux specific epoll
* interface
@@ -230,6 +263,8 @@ class PollerPrivate {
bool isShutdown;
InterruptHandle interruptHandle;
::sigset_t sigMask;
+ HandleSet registeredHandles;
+ AtomicCount threadCount;
static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
switch (dir) {
@@ -308,6 +343,7 @@ void Poller::registerHandle(PollerHandle& handle) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
+ impl->registeredHandles.add(&handle);
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
eh.setActive();
@@ -318,6 +354,7 @@ void Poller::unregisterHandle(PollerHandle& handle) {
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
+ impl->registeredHandles.remove(&handle);
int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
@@ -475,6 +512,7 @@ void Poller::run() {
::sigfillset(&ss);
::pthread_sigmask(SIG_SETMASK, &ss, 0);
+ ++(impl->threadCount);
do {
Event event = wait();
@@ -486,6 +524,8 @@ void Poller::run() {
switch (event.type) {
case SHUTDOWN:
PollerHandleDeletionManager.destroyThreadState();
+ //last thread to respond to shutdown cleans up:
+ if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup();
return;
default:
// This should be impossible
@@ -497,6 +537,12 @@ void Poller::run() {
QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what());
}
PollerHandleDeletionManager.destroyThreadState();
+ --(impl->threadCount);
+}
+
+bool Poller::hasShutdown()
+{
+ return impl->isShutdown;
}
Poller::Event Poller::wait(Duration timeout) {
diff --git a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
index f12012cbb0..06d542c938 100644
--- a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
@@ -293,6 +293,11 @@ void Poller::shutdown() {
impl->interrupt();
}
+bool Poller::hasShutdown()
+{
+ return impl->isShutdown;
+}
+
bool Poller::interrupt(PollerHandle& handle) {
PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
index 4fcc9155f1..d326ab02ac 100755
--- a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -100,6 +100,11 @@ void Poller::shutdown() {
PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
}
+bool Poller::hasShutdown()
+{
+ return impl->isShutdown;
+}
+
bool Poller::interrupt(PollerHandle&) {
return false; // There's no concept of a registered handle.
}