diff options
author | Alan Conway <aconway@apache.org> | 2009-07-09 20:38:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-07-09 20:38:23 +0000 |
commit | 0c5ace5d2114d638bdef5f4c2e29521c43f140a4 (patch) | |
tree | 575392461bf2bc98f8ea4b8f2c9241aad134cf55 /cpp/src | |
parent | c25ed489bd7a5ae7fce248dcc105b9ad7f6a1e65 (diff) | |
download | qpid-python-0c5ace5d2114d638bdef5f4c2e29521c43f140a4.tar.gz |
Simplified PollableCondition
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@792676 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/PollableCondition.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/PollableCondition.cpp | 113 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/PollableCondition.cpp | 13 | ||||
-rw-r--r-- | cpp/src/tests/PollableCondition.cpp | 28 |
5 files changed, 41 insertions, 140 deletions
diff --git a/cpp/src/qpid/sys/PollableCondition.h b/cpp/src/qpid/sys/PollableCondition.h index f49fb22cb4..2eb6f2d947 100644 --- a/cpp/src/qpid/sys/PollableCondition.h +++ b/cpp/src/qpid/sys/PollableCondition.h @@ -44,28 +44,13 @@ public: /** * Set the condition. Triggers callback to Callback from Poller. - * When callback is made, condition is suspended. Call rearm() to - * resume reacting to the condition. */ QPID_COMMON_EXTERN void set(); /** - * Get the current state of the condition, then clear it. - * - * @return The state of the condition before it was cleared. + * Clear the condition. Stops callbacks from Poller. */ - QPID_COMMON_EXTERN bool clear(); - - /** - * Temporarily suspend the ability for the poller to react to the - * condition. It can be rearm()ed later. - */ - QPID_COMMON_EXTERN void disarm(); - - /** - * Reset the ability for the poller to react to the condition. - */ - QPID_COMMON_EXTERN void rearm(); + QPID_COMMON_EXTERN void clear(); private: PollableConditionPrivate *impl; diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 1d390a6eb0..0786b21610 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -119,7 +119,6 @@ template <class T> void PollableQueue<T>::start() { if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); - condition.rearm(); } template <class T> PollableQueue<T>::~PollableQueue() { @@ -139,7 +138,6 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) { dispatcher = Thread(); if (queue.empty()) cond.clear(); if (stopped) lock.notifyAll(); - else cond.rearm(); } template <class T> void PollableQueue<T>::process() { @@ -166,11 +164,11 @@ template <class T> void PollableQueue<T>::shutdown() { template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); if (stopped) return; - condition.disarm(); + condition.clear(); stopped = true; // Avoid deadlock if stop is called from the dispatch thread - while (dispatcher.id() && dispatcher.id() != Thread::current().id()) - lock.wait(); + if (dispatcher.id() != Thread::current().id()) + while (dispatcher.id()) lock.wait(); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp index 0991e5fd76..b22a615a54 100644 --- a/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -46,8 +46,8 @@ private: ~PollableConditionPrivate(); void dispatch(sys::DispatchHandle& h); - void rewatch(); - void unwatch(); + void set(); + void clear(); private: PollableCondition::Callback cb; @@ -57,10 +57,11 @@ private: std::auto_ptr<DispatchHandleRef> handle; }; -PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, - sys::PollableCondition& parent, - const boost::shared_ptr<sys::Poller>& poller) - : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) +PollableConditionPrivate::PollableConditionPrivate( + const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller +) : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) { int fds[2]; if (::pipe(fds) == -1) @@ -71,39 +72,41 @@ PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition: throw ErrnoException(QPID_MSG("Can't create PollableCondition")); if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - handle.reset (new DispatchHandleRef(*this, - boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1), - 0, 0)); + handle.reset (new DispatchHandleRef( + *this, + boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1), + 0, 0)); handle->startWatch(poller); handle->unwatch(); + + // Make the read FD readable + static const char dummy=0; + ssize_t n = ::write(writeFd, &dummy, 1); + if (n == -1 && errno != EAGAIN) + throw ErrnoException("Error setting PollableCondition"); } -PollableConditionPrivate::~PollableConditionPrivate() -{ +PollableConditionPrivate::~PollableConditionPrivate() { handle->stopWatch(); close(writeFd); } -void PollableConditionPrivate::dispatch(sys::DispatchHandle& /*h*/) -{ +void PollableConditionPrivate::dispatch(sys::DispatchHandle&) { cb(parent); } -void PollableConditionPrivate::rewatch() -{ +void PollableConditionPrivate::set() { handle->rewatch(); } -void PollableConditionPrivate::unwatch() -{ +void PollableConditionPrivate::clear() { handle->unwatch(); } - /* PollableCondition */ PollableCondition::PollableCondition(const Callback& cb, - const boost::shared_ptr<sys::Poller>& poller) - : impl(new PollableConditionPrivate(cb, *this, poller)) + const boost::shared_ptr<sys::Poller>& poller +) : impl(new PollableConditionPrivate(cb, *this, poller)) { } @@ -112,75 +115,9 @@ PollableCondition::~PollableCondition() delete impl; } -void PollableCondition::set() { - static const char dummy=0; - ssize_t n = ::write(impl->writeFd, &dummy, 1); - if (n == -1 && errno != EAGAIN) - throw ErrnoException("Error setting PollableCondition"); -} - -bool PollableCondition::clear() { - char buf[256]; - ssize_t n; - bool wasSet = false; - while ((n = ::read(impl->impl->fd, buf, sizeof(buf))) > 0) - wasSet = true; - if (n == -1 && errno != EAGAIN) - throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); - return wasSet; -} - -void PollableCondition::disarm() { - impl->unwatch(); -} - -void PollableCondition::rearm() { - impl->rewatch(); -} - - -#if 0 -// FIXME aconway 2008-08-12: More efficient Linux implementation using -// eventfd system call. Move to separate file & do configure.ac test -// to enable this when ::eventfd() is available. - -#include <sys/eventfd.h> +void PollableCondition::set() { impl->set(); } -namespace qpid { -namespace sys { - -PollableConditionPrivate::PollableConditionPrivate(const PollableCondition::Callback& cb, - sys::PollableCondition& parent, - const boost::shared_ptr<sys::Poller>& poller) - : cb(cb), parent(parent), poller(poller), - IOHandle(new sys::IOHandlePrivate) { - impl->fd = ::eventfd(0, 0); - if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); -} - -void PollableCondition::set() { - static const uint64_t value=1; - ssize_t n = ::write(impl->impl->fd, - reinterpret_cast<const void*>(&value), 8); - if (n != 8) throw ErrnoException("write failed on conditionfd"); -} - -bool PollableCondition::clear() { - char buf[8]; - ssize_t n = ::read(impl->impl->fd, buf, 8); - if (n != 8) throw ErrnoException("read failed on conditionfd"); - return *reinterpret_cast<uint64_t*>(buf); -} - -void PollableCondition::disarm() { - // ???? -} - -void PollableCondition::rearm() { - // ???? -} - -#endif +void PollableCondition::clear() { impl->clear(); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp index 82913934d6..2ba9067094 100644 --- a/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -107,17 +107,8 @@ void PollableCondition::set() { impl->poke(); } -bool PollableCondition::clear() { - return (0 != ::InterlockedExchange(&impl->isSet, 0)); -} - -void PollableCondition::disarm() { - ::InterlockedExchange(&impl->armed, 0); -} - -void PollableCondition::rearm() { - if (0 == ::InterlockedExchange(&impl->armed, 1) && impl->isSet) - impl->poke(); +void PollableCondition::clear() { + ::InterlockedExchange(&impl->isSet, 0); } }} // namespace qpid::sys diff --git a/cpp/src/tests/PollableCondition.cpp b/cpp/src/tests/PollableCondition.cpp index 33664d43fc..b5cf1b4cd2 100644 --- a/cpp/src/tests/PollableCondition.cpp +++ b/cpp/src/tests/PollableCondition.cpp @@ -38,7 +38,7 @@ const Duration LONG = TIME_SEC/10; class Callback { public: - enum Action { NONE, DISARM, CLEAR, DISARM_CLEAR }; + enum Action { NONE, CLEAR }; Callback() : count(), action(NONE) {} @@ -47,9 +47,7 @@ class Callback { ++count; switch(action) { case NONE: break; - case DISARM: pc.disarm(); break; case CLEAR: pc.clear(); break; - case DISARM_CLEAR: pc.disarm(); pc.clear(); break; } action = NONE; lock.notify(); @@ -86,27 +84,19 @@ QPID_AUTO_TEST_CASE(testPollableCondition) { Thread runner = Thread(*poller); - BOOST_CHECK(callback.isNotCalling()); // condition is not set or armed. - - pc.rearm(); - BOOST_CHECK(callback.isNotCalling()); // Armed but not set + BOOST_CHECK(callback.isNotCalling()); // condition is not set. pc.set(); - BOOST_CHECK(callback.isCalling()); // Armed and set. - BOOST_CHECK(callback.isCalling()); // Still armed and set. - - callback.nextCall(Callback::DISARM); - BOOST_CHECK(callback.isNotCalling()); // set but not armed + BOOST_CHECK(callback.isCalling()); // Set. + BOOST_CHECK(callback.isCalling()); // Still set. - pc.rearm(); - BOOST_CHECK(callback.isCalling()); // Armed and set. - callback.nextCall(Callback::CLEAR); - BOOST_CHECK(callback.isNotCalling()); // armed but not set + callback.nextCall(Callback::CLEAR); + BOOST_CHECK(callback.isNotCalling()); // Cleared pc.set(); - BOOST_CHECK(callback.isCalling()); // Armed and set. - callback.nextCall(Callback::DISARM_CLEAR); - BOOST_CHECK(callback.isNotCalling()); // not armed or set. + BOOST_CHECK(callback.isCalling()); // Set. + callback.nextCall(Callback::CLEAR); + BOOST_CHECK(callback.isNotCalling()); // Cleared. poller->shutdown(); runner.join(); |