diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-02-17 15:30:43 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-02-17 15:30:43 +0000 |
commit | 5bf8ba1094333927535b3bee35aec7cb2fa9f6a9 (patch) | |
tree | 180454c44f59052286ce0240ca351755ae138b78 | |
parent | 73a5bfd85f464a4493ec92e4fde7c21f95fd63ac (diff) | |
download | qpid-python-5bf8ba1094333927535b3bee35aec7cb2fa9f6a9.tar.gz |
Refactor PollableCondition to be portable; match PollableQueue to it; initial port on Linux
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/win-pollable-condition@745121 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/PollableCondition.h | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/PollableQueue.h | 46 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp | 119 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/PollableCondition.h | 56 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueEvents.cpp | 1 |
6 files changed, 183 insertions, 94 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index d5b53dc502..4ae1af0afb 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -139,7 +139,6 @@ posix_plat_hdr = \ qpid/sys/posix/PrivatePosix.h \ qpid/sys/posix/Mutex.h \ qpid/sys/posix/Fork.h \ - qpid/sys/posix/PollableCondition.h \ qpid/sys/posix/IntegerTypes.h \ qpid/sys/posix/Time.h diff --git a/qpid/cpp/src/qpid/sys/PollableCondition.h b/qpid/cpp/src/qpid/sys/PollableCondition.h index 56d38f90da..49e84e6cb0 100644 --- a/qpid/cpp/src/qpid/sys/PollableCondition.h +++ b/qpid/cpp/src/qpid/sys/PollableCondition.h @@ -22,7 +22,57 @@ * */ -// Currently only has a posix implementation, add #ifdefs for other platforms as needed. -#include "posix/PollableCondition.h" +#include "qpid/sys/Poller.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> + + +namespace qpid { +namespace sys { + +class PollableConditionPrivate; + +class PollableCondition { +public: + typedef boost::function1<void, PollableCondition&> Callback; + + PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller); + + ~PollableCondition(); + + /** + * Set the condition. Triggers callback to Callback from Poller. + * When callback is made, condition is suspended. Call rearm() to + * resume reacting to the condition. + */ + void set(); + + /** + * Get the current state of the condition, then clear it. + * + * @return The state of the condition before it was cleared. + */ + bool clear(); + + /** + * Temporarily suspend the ability for the poller to react to the + * condition. It can be rearm()ed later. + */ + void disarm(); + + /** + * Reset the ability for the poller to react to the condition. + */ + void rearm(); + + private: + PollableConditionPrivate *impl; + + Callback callback; + boost::shared_ptr<sys::Poller> poller; +}; + +}} // namespace qpid::sys #endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index b5ff98c2c7..a23cc5137a 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -23,8 +23,6 @@ */ #include "qpid/sys/PollableCondition.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include <boost/function.hpp> @@ -38,9 +36,10 @@ namespace sys { class Poller; /** - * A queue that can be polled by sys::Poller. Any thread can push to - * the queue, on wakeup the poller thread processes all items on the - * queue by passing them to a callback in a batch. + * A queue whose item processing is dispatched by sys::Poller. + * Any thread can push to the queue; items pushed trigger an event the Poller + * recognizes. When a Poller I/O thread dispatches the event, a + * user-specified callback is invoked with all items on the queue. */ template <class T> class PollableQueue { @@ -50,12 +49,21 @@ class PollableQueue { /** * Callback to process a batch of items from the queue. - * @param values to process, any items remaining after call are put back on the queue. + * + * @param values Queue of values to process. Any items remaining + * on return from Callback are put back on the queue. */ typedef boost::function<void (Queue& values)> Callback; - /** When the queue is selected by the poller, values are passed to callback cb. */ - PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); + /** + * Constructor; sets necessary parameters. + * + * @param cb Callback that will be called to process items on the + * queue. Will be called from a Poller I/O thread. + * @param poller Poller to use for dispatching queue events. + */ + PollableQueue(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller); ~PollableQueue(); @@ -85,14 +93,12 @@ class PollableQueue { typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; - void dispatch(sys::DispatchHandle&); + void dispatch(PollableCondition& cond); void process(); mutable sys::Monitor lock; Callback callback; - boost::shared_ptr<sys::Poller> poller; PollableCondition condition; - DispatchHandleRef handle; Queue queue, batch; Thread dispatcher; bool stopped; @@ -100,11 +106,10 @@ class PollableQueue { template <class T> PollableQueue<T>::PollableQueue( const Callback& cb, const boost::shared_ptr<sys::Poller>& p) - : callback(cb), poller(p), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true) + : callback(cb), + condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p), + stopped(true) { - handle.startWatch(poller); - handle.unwatch(); } template <class T> void PollableQueue<T>::start() { @@ -112,11 +117,10 @@ template <class T> void PollableQueue<T>::start() { if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); - handle.rewatch(); + condition.rearm(); } template <class T> PollableQueue<T>::~PollableQueue() { - handle.stopWatch(); } template <class T> void PollableQueue<T>::push(const T& t) { @@ -125,15 +129,15 @@ template <class T> void PollableQueue<T>::push(const T& t) { queue.push_back(t); } -template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { +template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) { ScopedLock l(lock); assert(dispatcher.id() == 0); dispatcher = Thread::current(); process(); dispatcher = Thread(); - if (queue.empty()) condition.clear(); + if (queue.empty()) cond.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + else cond.rearm(); } template <class T> void PollableQueue<T>::process() { @@ -159,7 +163,7 @@ template <class T> void PollableQueue<T>::shutdown() { template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); if (stopped) return; - handle.unwatch(); + condition.disarm(); stopped = true; // Avoid deadlock if stop is called from the dispatch thread while (dispatcher.id() && dispatcher.id() != Thread::current().id()) diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp index 0c55fd3c0d..0991e5fd76 100644 --- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -22,17 +22,46 @@ * */ -#include "PollableCondition.h" +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/Exception.h" +#include <boost/bind.hpp> + #include <unistd.h> #include <fcntl.h> namespace qpid { namespace sys { -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { +class PollableConditionPrivate : public sys::IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller); + ~PollableConditionPrivate(); + + void dispatch(sys::DispatchHandle& h); + void rewatch(); + void unwatch(); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr<sys::Poller> poller; + int writeFd; + std::auto_ptr<DispatchHandleRef> handle; +}; + +PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller) + : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) +{ int fds[2]; if (::pipe(fds) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); @@ -42,22 +71,71 @@ PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { throw ErrnoException(QPID_MSG("Can't create PollableCondition")); if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + handle.reset (new DispatchHandleRef(*this, + boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1), + 0, 0)); + handle->startWatch(poller); + handle->unwatch(); +} + +PollableConditionPrivate::~PollableConditionPrivate() +{ + handle->stopWatch(); + close(writeFd); +} + +void PollableConditionPrivate::dispatch(sys::DispatchHandle& /*h*/) +{ + cb(parent); +} + +void PollableConditionPrivate::rewatch() +{ + handle->rewatch(); +} + +void PollableConditionPrivate::unwatch() +{ + handle->unwatch(); +} + + /* PollableCondition */ + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller) + : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { + static const char dummy=0; + ssize_t n = ::write(impl->writeFd, &dummy, 1); + if (n == -1 && errno != EAGAIN) + throw ErrnoException("Error setting PollableCondition"); } bool PollableCondition::clear() { char buf[256]; ssize_t n; bool wasSet = false; - while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0) + while ((n = ::read(impl->impl->fd, buf, sizeof(buf))) > 0) wasSet = true; - if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); + if (n == -1 && errno != EAGAIN) + throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); return wasSet; } -void PollableCondition::set() { - static const char dummy=0; - ssize_t n = ::write(writeFd, &dummy, 1); - if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting PollableCondition"); +void PollableCondition::disarm() { + impl->unwatch(); +} + +void PollableCondition::rearm() { + impl->rewatch(); } @@ -71,22 +149,35 @@ void PollableCondition::set() { namespace qpid { namespace sys { -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { +PollableConditionPrivate::PollableConditionPrivate(const PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller) + : cb(cb), parent(parent), poller(poller), + IOHandle(new sys::IOHandlePrivate) { impl->fd = ::eventfd(0, 0); if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); } +void PollableCondition::set() { + static const uint64_t value=1; + ssize_t n = ::write(impl->impl->fd, + reinterpret_cast<const void*>(&value), 8); + if (n != 8) throw ErrnoException("write failed on conditionfd"); +} + bool PollableCondition::clear() { char buf[8]; - ssize_t n = ::read(impl->fd, buf, 8); + ssize_t n = ::read(impl->impl->fd, buf, 8); if (n != 8) throw ErrnoException("read failed on conditionfd"); return *reinterpret_cast<uint64_t*>(buf); } -void PollableCondition::set() { - static const uint64_t value=1; - ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8); - if (n != 8) throw ErrnoException("write failed on conditionfd"); +void PollableCondition::disarm() { + // ???? +} + +void PollableCondition::rearm() { + // ???? } #endif diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.h b/qpid/cpp/src/qpid/sys/posix/PollableCondition.h deleted file mode 100644 index 4ec277b0ec..0000000000 --- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef QPID_SYS_POSIX_POLLABLECONDITION_H -#define QPID_SYS_POSIX_POLLABLECONDITION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/IOHandle.h" - -namespace qpid { -namespace sys { - -/** - * A pollable condition to integrate in-process conditions with IO - * conditions in a polling loop. - * - * Setting the condition makes it readable for a poller. - * - * Writable/disconnected conditions are undefined and should not be - * polled for. - */ -class PollableCondition : public sys::IOHandle { - public: - PollableCondition(); - - /** Set the condition, triggers readable in a poller. */ - void set(); - - /** Get the current state of the condition, then clear it. - *@return The state of the condition before it was cleared. - */ - bool clear(); - - private: - int writeFd; -}; -}} // namespace qpid::sys - -#endif /*!QPID_SYS_POSIX_POLLABLECONDITION_H*/ diff --git a/qpid/cpp/src/tests/QueueEvents.cpp b/qpid/cpp/src/tests/QueueEvents.cpp index 3e377d04b3..df3c937e33 100644 --- a/qpid/cpp/src/tests/QueueEvents.cpp +++ b/qpid/cpp/src/tests/QueueEvents.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/QueueEvents.h" #include "qpid/client/QueueOptions.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Dispatcher.h" #include <boost/bind.hpp> #include <boost/format.hpp> |