diff options
author | Alan Conway <aconway@apache.org> | 2009-05-25 18:20:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-25 18:20:50 +0000 |
commit | 92be0a347cdfd9c407611982c72938ecde777a8c (patch) | |
tree | 0b75b63ba491584667af21d244df96648636d9e7 /cpp | |
parent | 21dd878012fad826d335bc8aa7d9e9b88da7d6ff (diff) | |
download | qpid-python-92be0a347cdfd9c407611982c72938ecde777a8c.tar.gz |
PollableQueue optimization - replace deque with vector.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@778464 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/PollableQueue.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 23 |
9 files changed, 41 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp index 7525e4cb76..c501a0ffbc 100644 --- a/cpp/src/qpid/broker/QueueEvents.cpp +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -66,15 +66,14 @@ void QueueEvents::unregisterListener(const std::string& id) } } -void QueueEvents::handle(EventQueue::Queue& events) -{ +QueueEvents::EventQueue::Batch::const_iterator +QueueEvents::handle(const EventQueue::Batch& events) { qpid::sys::Mutex::ScopedLock l(lock); - while (!events.empty()) { - for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { - i->second(events.front()); - } - events.pop_front(); + for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(*i); } + return events.end(); } void QueueEvents::shutdown() diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h index 82abd3d20a..a3d4ea42f9 100644 --- a/cpp/src/qpid/broker/QueueEvents.h +++ b/cpp/src/qpid/broker/QueueEvents.h @@ -74,7 +74,7 @@ class QueueEvents volatile bool enabled; qpid::sys::Mutex lock;//protect listeners from concurrent access - void handle(EventQueue::Queue& e); + EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 499d515d73..969d191bd7 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -61,8 +61,6 @@ class Connection : { public: - typedef sys::PollableQueue<EventFrame> PollableFrameQueue; - /** Local connection. */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink); /** Shadow connection. */ diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 1cb010c266..52ea84b02b 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -87,7 +87,7 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { return control(framing::AMQFrame(body), cid); } -iovec Event::toIovec() { +iovec Event::toIovec() const { encodeHeader(); iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; return iov; @@ -103,8 +103,8 @@ void EventHeader::encode(Buffer& b) const { } // Encode my header in my buffer. -void Event::encodeHeader () { - Buffer b(getStore(), HEADER_SIZE); +void Event::encodeHeader () const { + Buffer b(const_cast<char*>(getStore()), HEADER_SIZE); encode(b); assert(b.getPosition() == HEADER_SIZE); } diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index f0e445a08c..76ba88a87f 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -54,7 +54,7 @@ class EventHeader { /** Size of payload data, excluding header. */ size_t getSize() const { return size; } /** Size of header + payload. */ - size_t getStoreSize() { return size + HEADER_SIZE; } + size_t getStoreSize() const { return size + HEADER_SIZE; } bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } @@ -99,10 +99,10 @@ class Event : public EventHeader { operator framing::Buffer() const; - iovec toIovec(); + iovec toIovec() const; private: - void encodeHeader(); + void encodeHeader() const; RefCountedBuffer::pointer store; }; diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index f72867de4d..fee13c92c8 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -71,9 +71,9 @@ void Multicaster::mcast(const Event& e) { } -void Multicaster::sendMcast(PollableEventQueue::Queue& values) { +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { try { - PollableEventQueue::Queue::iterator i = values.begin(); + PollableEventQueue::Batch::const_iterator i = values.begin(); while( i != values.end()) { iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { @@ -82,12 +82,13 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { } ++i; } - values.erase(values.begin(), i); // Erase sent events. + return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); + return values.end(); } } diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index e1014fa499..1566c66a75 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -28,6 +28,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> +#include <deque> namespace qpid { @@ -63,7 +64,7 @@ class Multicaster typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; - void sendMcast(PollableEventQueue::Queue& ); + PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& ); sys::Mutex lock; boost::function<void()> onError; diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h index a44c39ad85..db98faa9da 100644 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ b/cpp/src/qpid/cluster/PollableQueue.h @@ -37,24 +37,27 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { typedef boost::function<void (const T&)> Callback; typedef boost::function<void()> ErrorCallback; - PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller) - : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller), + PollableQueue(Callback f, ErrorCallback err, const std::string& msg, + const boost::shared_ptr<sys::Poller>& poller) + : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), + poller), callback(f), error(err), message(msg) {} - void handleBatch(typename sys::PollableQueue<T>::Queue& values) { + typename sys::PollableQueue<T>::Batch::const_iterator + handleBatch(const typename sys::PollableQueue<T>::Batch& values) { try { - typename sys::PollableQueue<T>::Queue::iterator i = values.begin(); + typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin(); while (i != values.end() && !this->isStopped()) { callback(*i); ++i; } - values.erase(values.begin(), i); + return i; } catch (const std::exception& e) { QPID_LOG(error, message << ": " << e.what()); - values.clear(); this->stop(); error(); + return values.end(); } } diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index f8acf0a5f6..1d390a6eb0 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -28,7 +28,7 @@ #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> -#include <deque> +#include <vector> namespace qpid { namespace sys { @@ -44,16 +44,18 @@ class Poller; template <class T> class PollableQueue { public: - typedef std::deque<T> Queue; + typedef std::vector<T> Batch; typedef T value_type; /** * Callback to process a batch of items from the queue. * - * @param values Queue of values to process. Any items remaining + * @param batch Queue of values to process. Any items remaining * on return from Callback are put back on the queue. + * @return iterator pointing to the first un-processed item in batch. + * Items from this point up to batch.end() are put back on the queue. */ - typedef boost::function<void (Queue&)> Callback; + typedef boost::function<typename Batch::const_iterator (const Batch& batch)> Callback; /** * Constructor; sets necessary parameters. @@ -99,7 +101,7 @@ class PollableQueue { mutable sys::Monitor lock; Callback callback; PollableCondition condition; - Queue queue, batch; + Batch queue, batch; Thread dispatcher; bool stopped; }; @@ -141,17 +143,18 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) { } template <class T> void PollableQueue<T>::process() { + // Called with lock held while (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); + typename Batch::const_iterator putBack; { ScopedUnlock u(lock); // Allow concurrent push to queue. - callback(batch); - } - if (!batch.empty()) { - queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items. - batch.clear(); + putBack = callback(batch); } + // put back unprocessed items. + queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); + batch.clear(); } } |