diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-05-18 21:33:15 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-05-18 21:33:15 +0000 |
commit | bbf31a9b3113ad6d37ed24d2ce767dd5f830afa3 (patch) | |
tree | 3a3f276254da99f54076d5ecbc35be1dcbb2d239 | |
parent | dc481af66b9ec3765416955fefbd80b5df6107f7 (diff) | |
download | qpid-python-bbf31a9b3113ad6d37ed24d2ce767dd5f830afa3.tar.gz |
Fix the behaviour of the EpollPoller when shutdowns and interrupts interact
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945899 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/PollerTest.cpp | 64 |
2 files changed, 56 insertions, 14 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index d7f64f3b4c..7b0d0aaa7f 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -536,6 +536,12 @@ Poller::Event Poller::wait(Duration timeout) { // Check if this is an interrupt PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle; if (dataPtr == &interruptHandle) { + // If we are shutting down we need to rearm the shutdown interrupt to + // ensure everyone still sees it. It's okay that this might be overridden + // below as we will be back here if it is. + if (impl->isShutdown) { + impl->interruptAll(); + } PollerHandle* wrappedHandle = 0; { ScopedLock<Mutex> l(interruptHandle.impl->lock); diff --git a/cpp/src/tests/PollerTest.cpp b/cpp/src/tests/PollerTest.cpp index 11337d6be3..9fa5689c5f 100644 --- a/cpp/src/tests/PollerTest.cpp +++ b/cpp/src/tests/PollerTest.cpp @@ -69,20 +69,24 @@ int readALot(int fd) { return bytesRead; } +void makesocketpair(int (&sv)[2]) { + int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); + assert(rc >= 0); + + // Set non-blocking + rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); + assert(rc >= 0); + + rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); + assert(rc >= 0); +} + int main(int /*argc*/, char** /*argv*/) { try { int sv[2]; - int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); - assert(rc >= 0); - - // Set non-blocking - rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); - assert(rc >= 0); - - rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); - assert(rc >= 0); + makesocketpair(sv); // Make up a large string string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;"; @@ -92,16 +96,13 @@ int main(int /*argc*/, char** /*argv*/) // Read as much as we can from socket 0 int bytesRead = readALot(sv[0]); assert(bytesRead == 0); - cout << "Read(0): " << bytesRead << " bytes\n"; // Write as much as we can to socket 0 int bytesWritten = writeALot(sv[0], testString); - cout << "Wrote(0): " << bytesWritten << " bytes\n"; // Read as much as we can from socket 1 bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); - cout << "Read(1): " << bytesRead << " bytes\n"; auto_ptr<Poller> poller(new Poller); @@ -121,7 +122,6 @@ int main(int /*argc*/, char** /*argv*/) // Write as much as we can to socket 0 bytesWritten = writeALot(sv[0], testString); - cout << "Wrote(0): " << bytesWritten << " bytes\n"; // Wait for 500ms - h0 no longer writable event = poller->wait(500000000); @@ -136,7 +136,6 @@ int main(int /*argc*/, char** /*argv*/) bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); - cout << "Read(1): " << bytesRead << " bytes\n"; // Test poller interrupt assert(poller->interrupt(h0) == true); @@ -218,6 +217,43 @@ int main(int /*argc*/, char** /*argv*/) assert(event.handle == 0); assert(event.type == Poller::SHUTDOWN); + ::close(sv[0]); + + // Test for correct interaction of shutdown and interrupts - need to have new poller + // etc. for this + makesocketpair(sv); + + auto_ptr<Poller> poller1(new Poller); + + PosixIOHandle f2(sv[0]); + PosixIOHandle f3(sv[1]); + + PollerHandle h2(f2); + PollerHandle h3(f3); + + poller1->registerHandle(h2); + poller1->monitorHandle(h2, Poller::INOUT); + event = poller1->wait(); + assert(event.handle == &h2); + assert(event.type == Poller::WRITABLE); + + // Shutdown + poller1->shutdown(); + event = poller1->wait(); + assert(event.handle == 0); + assert(event.type == Poller::SHUTDOWN); + + assert(poller1->interrupt(h2) == true); + event = poller1->wait(); + assert(event.handle == &h2); + assert(event.type == Poller::INTERRUPTED); + poller1->unmonitorHandle(h2, Poller::INOUT); + + event = poller1->wait(); + assert(event.handle == 0); + assert(event.type == Poller::SHUTDOWN); + + poller1->unregisterHandle(h2); return 0; } catch (exception& e) { cout << "Caught exception " << e.what() << "\n"; |