diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-01-06 23:42:18 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-06 23:42:18 +0000 |
commit | 9a933ae9011d343a75929136269fe45c6b863a17 (patch) | |
tree | 29ebd71241d810af6e0f20d7e5694cba1607486f /cpp/src | |
parent | 820071d5a9959a2923269751ddcff2ed085b239a (diff) | |
download | qpid-python-9a933ae9011d343a75929136269fe45c6b863a17.tar.gz |
Work on the low level IO code:
* Introduce code so that you can interrupt waiting for a handle and receive
a callback that is correctly serialised with the IO callbacks for that
handle
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732177 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Poller.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 188 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/IOHandle.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/PrivatePosix.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/DispatcherTest.cpp | 87 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 7 | ||||
-rw-r--r-- | cpp/src/tests/PollerTest.cpp | 54 |
13 files changed, 394 insertions, 83 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 68e441349a..0a2a1ca1b4 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -114,6 +114,7 @@ public: typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback; typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback; typedef boost::function1<void, AsynchIO&> IdleCallback; + typedef boost::function1<void, AsynchIO&> RequestCallback; // Call create() to allocate a new AsynchIO object with the specified // callbacks. This method is implemented in platform-specific code to @@ -138,6 +139,7 @@ public: virtual void queueWriteClose() = 0; virtual bool writeQueueEmpty() = 0; virtual void startReading() = 0; + virtual void requestCallback(RequestCallback) = 0; virtual BufferBase* getQueuedBuffer() = 0; protected: diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp index 4722fc0b8b..cbdee7eda6 100644 --- a/cpp/src/qpid/sys/DispatchHandle.cpp +++ b/cpp/src/qpid/sys/DispatchHandle.cpp @@ -270,22 +270,30 @@ void DispatchHandle::stopWatch() { case IDLE: case DELAYED_IDLE: case DELAYED_DELETE: - return; + return; case DELAYED_R: case DELAYED_W: case DELAYED_RW: case DELAYED_INACTIVE: - state = DELAYED_IDLE; - break; + state = DELAYED_IDLE; + break; default: - state = IDLE; - break; + state = IDLE; + break; } assert(poller); poller->delFd(*this); poller.reset(); } +void DispatchHandle::call(Callback iCb) { + assert(iCb); + ScopedLock<Mutex> lock(stateLock); + interruptedCallbacks.push(iCb); + + (void) poller->interrupt(*this); +} + // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete @@ -302,9 +310,9 @@ void DispatchHandle::doDelete() { state = DELAYED_DELETE; return; case IDLE: - break; + break; default: - // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states + // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states assert(false); } } @@ -368,14 +376,29 @@ void DispatchHandle::processEvent(Poller::EventType type) { disconnectedCallback(*this); } break; + case Poller::INTERRUPTED: + { + ScopedLock<Mutex> lock(stateLock); + assert(interruptedCallbacks.size() > 0); + // We'll actually do the interrupt below + } + break; default: assert(false); } - // If any of the callbacks re-enabled reading/writing then actually - // do it now { ScopedLock<Mutex> lock(stateLock); + // If we've got a pending interrupt do it now + while (interruptedCallbacks.size() > 0) { + Callback cb = interruptedCallbacks.front(); + assert(cb); + cb(*this); + interruptedCallbacks.pop(); + } + + // If any of the callbacks re-enabled reading/writing then actually + // do it now switch (state) { case DELAYED_R: poller->modFd(*this, Poller::INPUT); diff --git a/cpp/src/qpid/sys/DispatchHandle.h b/cpp/src/qpid/sys/DispatchHandle.h index 219f2c53d6..ffcbd80f7e 100644 --- a/cpp/src/qpid/sys/DispatchHandle.h +++ b/cpp/src/qpid/sys/DispatchHandle.h @@ -27,6 +27,7 @@ #include <boost/function.hpp> +#include <queue> namespace qpid { namespace sys { @@ -53,11 +54,13 @@ class DispatchHandle : public PollerHandle { friend class DispatchHandleRef; public: typedef boost::function1<void, DispatchHandle&> Callback; + typedef std::queue<Callback> CallbackQueue; private: Callback readableCallback; Callback writableCallback; Callback disconnectedCallback; + CallbackQueue interruptedCallbacks; Poller::shared_ptr poller; Mutex stateLock; enum { @@ -92,12 +95,12 @@ public: /** Add this DispatchHandle to the poller to be watched. */ void startWatch(Poller::shared_ptr poller); - /** Resume watchingn for all non-0 callbacks. */ + /** Resume watching for all non-0 callbacks. */ void rewatch(); - /** Resume watchingn for read only. */ + /** Resume watching for read only. */ void rewatchRead(); - /** Resume watchingn for write only. */ + /** Resume watching for write only. */ void rewatchWrite(); /** Stop watching temporarily. The DispatchHandle remains @@ -112,6 +115,11 @@ public: /** Stop watching permanently. Disassociates from the poller. */ void stopWatch(); + /** Interrupt watching this handle and make a serialised callback that respects the + * same exclusivity guarantees as the other callbacks + */ + void call(Callback iCb); + protected: /** Override to get extra processing done when the DispatchHandle is deleted. */ void doDelete(); @@ -139,6 +147,7 @@ public: void unwatchRead() { ref->unwatchRead(); } void unwatchWrite() { ref->unwatchWrite(); } void stopWatch() { ref->stopWatch(); } + void call(Callback iCb) { ref->call(iCb); } }; }} diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 8d1d1b79f5..e8ae6bc2fe 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -34,26 +34,7 @@ Dispatcher::~Dispatcher() { } void Dispatcher::run() { - do { - Poller::Event event = poller->wait(); - - // If can read/write then dispatch appropriate callbacks - if (event.handle) { - event.process(); - } else { - // Handle shutdown - switch (event.type) { - case Poller::SHUTDOWN: - goto dispatcher_shutdown; - default: - // This should be impossible - assert(false); - } - } - } while (true); - -dispatcher_shutdown: - ; + poller->run(); } }} diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h index 6b7f4d818e..8e9f67fefd 100644 --- a/cpp/src/qpid/sys/Poller.h +++ b/cpp/src/qpid/sys/Poller.h @@ -23,6 +23,7 @@ */ #include "Time.h" +#include "Runnable.h" #include <boost/shared_ptr.hpp> @@ -37,7 +38,7 @@ namespace sys { */ class PollerHandle; class PollerPrivate; -class Poller { +class Poller : public Runnable { PollerPrivate* const impl; public: @@ -57,7 +58,8 @@ public: READ_WRITABLE, DISCONNECTED, SHUTDOWN, - TIMEOUT + TIMEOUT, + INTERRUPTED }; struct Event { @@ -76,6 +78,20 @@ public: ~Poller(); /** Note: this function is async-signal safe */ void shutdown(); + + // Interrupt waiting for a specific poller handle + // returns true if we could interrupt the handle + // - in this case on return the handle is no longer being monitored, + // but we will receive an event from some invocation of poller::wait + // with the handle and the INTERRUPTED event type + // if it returns false then the handle is not being monitored by the poller + // - This can either be because it has just received an event which has been + // reported and has not been reenabled since. Or because it was removed + // from the monitoring set + bool interrupt(PollerHandle& handle); + + // Poller run loop + void run(); void addFd(PollerHandle& handle, Direction dir); void delFd(PollerHandle& handle); diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index a1e624ea75..10705e12da 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -28,9 +28,10 @@ #include <sys/epoll.h> #include <errno.h> +#include <signal.h> #include <assert.h> -#include <vector> +#include <queue> #include <exception> namespace qpid { @@ -58,14 +59,12 @@ class PollerHandlePrivate { int fd; ::__uint32_t events; - PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(int f, PollerHandle* p) : + PollerHandlePrivate(int f) : fd(f), events(0), - pollerHandle(p), stat(ABSENT) { } @@ -112,7 +111,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl), this)) + impl(new PollerHandlePrivate(toFd(h.impl))) {} PollerHandle::~PollerHandle() { @@ -161,9 +160,47 @@ class PollerPrivate { }; static ReadablePipe alwaysReadable; - + static int alwaysReadableFd; + + class InterruptHandle: public PollerHandle { + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + // Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + // Process synthesised event + event.process(); + } + + public: + InterruptHandle() : + PollerHandle(DummyIOHandle) + {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle* getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } + }; + const int epollFd; bool isShutdown; + InterruptHandle interruptHandle; + ::sigset_t sigMask; static ::__uint32_t directionToEpollEvent(Poller::Direction dir) { switch (dir) { @@ -193,15 +230,41 @@ class PollerPrivate { epollFd(::epoll_create(DefaultFds)), isShutdown(false) { QPID_POSIX_CHECK(epollFd); + ::sigemptyset(&sigMask); + // Add always readable fd into our set (but not listening to it yet) + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); } ~PollerPrivate() { // It's probably okay to ignore any errors here as there can't be data loss ::close(epollFd); } + + void interrupt(bool all=false) { + ::epoll_event epe; + if (all) { + // Not EPOLLONESHOT, so we eventually get all threads + epe.events = ::EPOLLIN; + epe.data.u64 = 0; // Keep valgrind happy + } else { + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); + } + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); + } + + void interruptAll() { + interrupt(true); + } }; PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; +int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD(); void Poller::addFd(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; @@ -218,7 +281,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); } epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); @@ -249,7 +312,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { ::epoll_event epe; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -266,7 +329,7 @@ void Poller::rearmFd(PollerHandle& handle) { ::epoll_event epe; epe.events = eh.events; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -281,28 +344,85 @@ void Poller::shutdown() { if (impl->isShutdown) return; - // Don't use any locking here - isshutdown will be visible to all + // Don't use any locking here - isShutdown will be visible to all // after the epoll_ctl() anyway (it's a memory barrier) impl->isShutdown = true; - // Add always readable fd to epoll (not EPOLLONESHOT) - int fd = impl->alwaysReadable.getFD(); - ::epoll_event epe; - epe.events = ::EPOLLIN; - epe.data.u64 = 0; // Keep valgrind happy - don't strictly need next line now - epe.data.ptr = 0; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe)); + impl->interruptAll(); +} + +bool Poller::interrupt(PollerHandle& handle) { + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isInactive()) { + return false; + } + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + eh.setInactive(); + } + + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; + ScopedLock<Mutex> l(eh.lock); + ih.addHandle(handle); + + impl->interrupt(); + eh.setActive(); + return true; +} + +void Poller::run() { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); } Poller::Event Poller::wait(Duration timeout) { epoll_event epe; int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; + AbsTime targetTimeout = + (timeout == TIME_INFINITE) ? + FAR_FUTURE : + AbsTime(now(), timeout); - // Repeat until we weren't interupted + // Repeat until we weren't interrupted by signal do { PollerHandleDeletionManager.markAllUnusedInThisThread(); + // Need to run on kernels without epoll_pwait() + // - fortunately in this case we don't really need the atomicity of epoll_pwait() +#if 1 + sigset_t os; + pthread_sigmask(SIG_SETMASK, &impl->sigMask, &os); int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs); - + pthread_sigmask(SIG_SETMASK, &os, 0); +#else + int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask); +#endif + // Check for shutdown if (impl->isShutdown) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, SHUTDOWN); @@ -312,13 +432,27 @@ Poller::Event Poller::wait(Duration timeout) { QPID_POSIX_CHECK(rc); } else if (rc > 0) { assert(rc == 1); - PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(epe.data.ptr); + PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr); + PollerHandlePrivate& eh = *handle->impl; ScopedLock<Mutex> l(eh.lock); - + // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { - PollerHandle* handle = eh.pollerHandle; + + // Check if this is an interrupt + if (handle == &impl->interruptHandle) { + PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); + // If there is an interrupt queued behind this one we need to arm it + // We do it this way so that another thread can pick it up + if (impl->interruptHandle.queuedHandles()) { + impl->interrupt(); + eh.setActive(); + } else { + eh.setInactive(); + } + return Event(wrappedHandle, INTERRUPTED); + } // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again @@ -349,10 +483,12 @@ Poller::Event Poller::wait(Duration timeout) { // The only things we can do here are return a timeout or wait more. // Obviously if we timed out we return timeout; if the wait was meant to // be indefinite then we should never return with a time out so we go again. - // If the wait wasn't indefinite, but we were interrupted then we have to return - // with a timeout as we don't know how long we've waited so far and so we can't - // continue the wait. - if (rc == 0 || timeoutMs != -1) { + // If the wait wasn't indefinite, we check whether we are after the target wait + // time or not + if (timeoutMs == -1) { + continue; + } + if (rc == 0 && now() > targetTimeout) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, TIMEOUT); } diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 9a5798311b..b4fede06fd 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -266,6 +266,7 @@ public: virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void requestCallback(RequestCallback); virtual BufferBase* getQueuedBuffer(); private: @@ -275,6 +276,7 @@ private: void readable(DispatchHandle& handle); void writeable(DispatchHandle& handle); void disconnected(DispatchHandle& handle); + void requestedCall(RequestCallback); void close(DispatchHandle& handle); private: @@ -386,6 +388,18 @@ void AsynchIO::startReading() { DispatchHandle::rewatchRead(); } +void AsynchIO::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback)); +} + +void AsynchIO::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + /** Return a queued buffer if there are enough * to spare */ diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp index 80b487eadc..075eb4c335 100644 --- a/cpp/src/qpid/sys/posix/IOHandle.cpp +++ b/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -31,6 +31,8 @@ int toFd(const IOHandlePrivate* h) return h->fd; } +NullIOHandle DummyIOHandle; + IOHandle::IOHandle(IOHandlePrivate* h) : impl(h) {} diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h index 33c0cd81bc..0fefa50ab6 100644 --- a/cpp/src/qpid/sys/posix/PrivatePosix.h +++ b/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -23,6 +23,7 @@ */ #include "qpid/sys/Time.h" +#include "qpid/sys/IOHandle.h" struct timespec; struct timeval; @@ -47,6 +48,25 @@ public: int toFd(const IOHandlePrivate* h); +// Posix fd as an IOHandle +class PosixIOHandle : public IOHandle { +public: + PosixIOHandle(int fd) : + IOHandle(new IOHandlePrivate(fd)) + {} +}; + +// Dummy IOHandle for places it's required in the API +// but we promise not to actually try to do any operations on the IOHandle +class NullIOHandle : public IOHandle { +public: + NullIOHandle() : + IOHandle(new IOHandlePrivate) + {} +}; + +extern NullIOHandle DummyIOHandle; + }} #endif /*!_sys_posix_PrivatePosix_h*/ diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index ca56efd8dd..356d5ba927 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -284,6 +284,7 @@ public: virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void requestCallback(RequestCallback); /** * getQueuedBuffer returns a buffer from the buffer queue, if one is @@ -531,6 +532,11 @@ void AsynchIO::startReading() { return; } +// TODO: This needs to arrange for a callback that is serialised with +// the other IO callbacks for this AsynchIO +void AsynchIO::requestCallback(RequestCallback callback) { +} + /** * Return a queued buffer if there are enough to spare. */ diff --git a/cpp/src/tests/DispatcherTest.cpp b/cpp/src/tests/DispatcherTest.cpp index 7631956acc..c2f6bca12a 100644 --- a/cpp/src/tests/DispatcherTest.cpp +++ b/cpp/src/tests/DispatcherTest.cpp @@ -20,7 +20,10 @@ */ #include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/Dispatcher.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/posix/PrivatePosix.h" #include "qpid/sys/Thread.h" #include <sys/types.h> @@ -28,6 +31,7 @@ #include <fcntl.h> #include <unistd.h> #include <errno.h> +#include <signal.h> #include <iostream> #include <boost/bind.hpp> @@ -74,7 +78,26 @@ void reader(DispatchHandle& h, int fd) { h.rewatch(); } -int main(int argc, char** argv) +DispatchHandle* rh = 0; +DispatchHandle* wh = 0; + +void rInterrupt(DispatchHandle&) { + cerr << "R"; +} + +void wInterrupt(DispatchHandle&) { + cerr << "W"; +} + +DispatchHandle::Callback rcb = rInterrupt; +DispatchHandle::Callback wcb = wInterrupt; + +void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { + rh->call(rcb); + wh->call(wcb); +} + +int main(int /*argc*/, char** /*argv*/) { // Create poller Poller::shared_ptr poller(new Poller); @@ -82,12 +105,12 @@ int main(int argc, char** argv) // Create dispatcher thread Dispatcher d(poller); Dispatcher d1(poller); - //Dispatcher d2(poller); - //Dispatcher d3(poller); + Dispatcher d2(poller); + Dispatcher d3(poller); Thread dt(d); Thread dt1(d1); - //Thread dt2(d2); - //Thread dt3(d3); + Thread dt2(d2); + Thread dt3(d3); // Setup sender and receiver int sv[2]; @@ -106,22 +129,58 @@ int main(int argc, char** argv) for (int i = 0; i < 8; i++) testString += testString; - DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0); - DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString)); + PosixIOHandle f0(sv[0]); + PosixIOHandle f1(sv[1]); - rh.watch(poller); - wh.watch(poller); + rh = new DispatchHandle(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh = new DispatchHandle(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); - // wait 2 minutes then shutdown - sleep(60); + rh->startWatch(poller); + wh->startWatch(poller); + + // Set up a regular itimer interupt + + // Ignore signal in this thread + ::sigset_t sm; + ::sigemptyset(&sm); + ::sigaddset(&sm, SIGRTMIN); + ::pthread_sigmask(SIG_BLOCK, &sm, 0); + + // Signal handling + struct ::sigaction sa; + sa.sa_sigaction = timer_handler; + sa.sa_flags = SA_RESTART | SA_SIGINFO; + ::sigemptyset(&sa.sa_mask); + rc = ::sigaction(SIGRTMIN, &sa,0); + assert(rc == 0); + + ::sigevent se; + se.sigev_notify = SIGEV_SIGNAL; + se.sigev_signo = SIGRTMIN; + se.sigev_value.sival_ptr = 0; + timer_t timer; + rc = ::timer_create(CLOCK_REALTIME, &se, &timer); + assert(rc == 0); + itimerspec ts = { + /*.it_value = */ {2, 0}, // s, ns + /*.it_interval = */ {2, 0}}; // s, ns + + rc = ::timer_settime(timer, 0, &ts, 0); + assert(rc == 0); + + // wait 2 minutes then shutdown + ::sleep(60); + + rc = ::timer_delete(timer); + assert(rc == 0); poller->shutdown(); dt.join(); dt1.join(); - //dt2.join(); - //dt3.join(); + dt2.join(); + dt3.join(); - cout << "Wrote: " << writtenBytes << "\n"; + cout << "\nWrote: " << writtenBytes << "\n"; cout << "Read: " << readBytes << "\n"; return 0; diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 3a608b2bae..47439d0bab 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -187,6 +187,13 @@ check_PROGRAMS+=sender sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h sender_LDADD=$(lib_client) +check_PROGRAMS+=PollerTest +PollerTest_SOURCES=PollerTest.cpp +PollerTest_LDADD=$(lib_common) + +check_PROGRAMS+=DispatcherTest +DispatcherTest_SOURCES=DispatcherTest.cpp +DispatcherTest_LDADD=$(lib_common) TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test diff --git a/cpp/src/tests/PollerTest.cpp b/cpp/src/tests/PollerTest.cpp index fcb1d0dadf..4f11dc5901 100644 --- a/cpp/src/tests/PollerTest.cpp +++ b/cpp/src/tests/PollerTest.cpp @@ -23,7 +23,9 @@ * Use socketpair to test the poller */ +#include "qpid/sys/IOHandle.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/posix/PrivatePosix.h" #include <string> #include <iostream> @@ -67,7 +69,7 @@ int readALot(int fd) { return bytesRead; } -int main(int argc, char** argv) +int main(int /*argc*/, char** /*argv*/) { try { @@ -103,15 +105,18 @@ int main(int argc, char** argv) auto_ptr<Poller> poller(new Poller); - PollerHandle h0(sv[0]); - PollerHandle h1(sv[1]); + PosixIOHandle f0(sv[0]); + PosixIOHandle f1(sv[1]); + + PollerHandle h0(f0); + PollerHandle h1(f1); poller->addFd(h0, Poller::INOUT); - // Wait for 500ms - h0 should be writable + // h0 should be writable Poller::Event event = poller->wait(); assert(event.handle == &h0); - assert(event.dir == Poller::OUT); + assert(event.type == Poller::WRITABLE); // Write as much as we can to socket 0 bytesWritten = writeALot(sv[0], testString); @@ -126,17 +131,48 @@ int main(int argc, char** argv) poller->addFd(h1, Poller::INOUT); event = poller->wait(); assert(event.handle == &h1); - assert(event.dir == Poller::INOUT); + assert(event.type == Poller::READ_WRITABLE); + + // Can't interrupt, it's not active + assert(poller->interrupt(h1) == false); bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); cout << "Read(1): " << bytesRead << " bytes\n"; + + // Test poller interrupt + assert(poller->interrupt(h0) == true); + event = poller->wait(); + assert(event.handle == &h0); + assert(event.type == Poller::INTERRUPTED); + assert(poller->interrupt(h0) == false); + + // Test multiple interrupts + poller->rearmFd(h0); + poller->rearmFd(h1); + assert(poller->interrupt(h0) == true); + assert(poller->interrupt(h1) == true); + // Make sure we can't interrupt them again + assert(poller->interrupt(h0) == false); + assert(poller->interrupt(h1) == false); + + // Make sure that they both come out in the correct order + event = poller->wait(); + assert(event.handle == &h0); + assert(event.type == Poller::INTERRUPTED); + assert(poller->interrupt(h0) == false); + event = poller->wait(); + assert(event.handle == &h1); + assert(event.type == Poller::INTERRUPTED); + assert(poller->interrupt(h1) == false); + // At this point h1 should have been disabled from the poller // (as it was just returned) and h0 can write again + poller->rearmFd(h0); event = poller->wait(); assert(event.handle == &h0); - assert(event.dir == Poller::OUT); + assert(event.type == Poller::WRITABLE); // Now both the handles should be disabled event = poller->wait(500000000); @@ -146,11 +182,11 @@ int main(int argc, char** argv) poller->shutdown(); event = poller->wait(); assert(event.handle == 0); - assert(event.dir == Poller::SHUTDOWN); + assert(event.type == Poller::SHUTDOWN); event = poller->wait(); assert(event.handle == 0); - assert(event.dir == Poller::SHUTDOWN); + assert(event.type == Poller::SHUTDOWN); poller->delFd(h1); poller->delFd(h0); |