summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-25 18:20:50 +0000
committerAlan Conway <aconway@apache.org>2009-05-25 18:20:50 +0000
commit92be0a347cdfd9c407611982c72938ecde777a8c (patch)
tree0b75b63ba491584667af21d244df96648636d9e7 /cpp
parent21dd878012fad826d335bc8aa7d9e9b88da7d6ff (diff)
downloadqpid-python-92be0a347cdfd9c407611982c72938ecde777a8c.tar.gz
PollableQueue optimization - replace deque with vector.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@778464 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp13
-rw-r--r--cpp/src/qpid/broker/QueueEvents.h2
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/Event.cpp6
-rw-r--r--cpp/src/qpid/cluster/Event.h6
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp7
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h3
-rw-r--r--cpp/src/qpid/cluster/PollableQueue.h15
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h23
9 files changed, 41 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
index 7525e4cb76..c501a0ffbc 100644
--- a/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -66,15 +66,14 @@ void QueueEvents::unregisterListener(const std::string& id)
}
}
-void QueueEvents::handle(EventQueue::Queue& events)
-{
+QueueEvents::EventQueue::Batch::const_iterator
+QueueEvents::handle(const EventQueue::Batch& events) {
qpid::sys::Mutex::ScopedLock l(lock);
- while (!events.empty()) {
- for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) {
- i->second(events.front());
- }
- events.pop_front();
+ for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(*i);
}
+ return events.end();
}
void QueueEvents::shutdown()
diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h
index 82abd3d20a..a3d4ea42f9 100644
--- a/cpp/src/qpid/broker/QueueEvents.h
+++ b/cpp/src/qpid/broker/QueueEvents.h
@@ -74,7 +74,7 @@ class QueueEvents
volatile bool enabled;
qpid::sys::Mutex lock;//protect listeners from concurrent access
- void handle(EventQueue::Queue& e);
+ EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 499d515d73..969d191bd7 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -61,8 +61,6 @@ class Connection :
{
public:
- typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
-
/** Local connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
/** Shadow connection. */
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 1cb010c266..52ea84b02b 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -87,7 +87,7 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
return control(framing::AMQFrame(body), cid);
}
-iovec Event::toIovec() {
+iovec Event::toIovec() const {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
return iov;
@@ -103,8 +103,8 @@ void EventHeader::encode(Buffer& b) const {
}
// Encode my header in my buffer.
-void Event::encodeHeader () {
- Buffer b(getStore(), HEADER_SIZE);
+void Event::encodeHeader () const {
+ Buffer b(const_cast<char*>(getStore()), HEADER_SIZE);
encode(b);
assert(b.getPosition() == HEADER_SIZE);
}
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index f0e445a08c..76ba88a87f 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -54,7 +54,7 @@ class EventHeader {
/** Size of payload data, excluding header. */
size_t getSize() const { return size; }
/** Size of header + payload. */
- size_t getStoreSize() { return size + HEADER_SIZE; }
+ size_t getStoreSize() const { return size + HEADER_SIZE; }
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
@@ -99,10 +99,10 @@ class Event : public EventHeader {
operator framing::Buffer() const;
- iovec toIovec();
+ iovec toIovec() const;
private:
- void encodeHeader();
+ void encodeHeader() const;
RefCountedBuffer::pointer store;
};
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index f72867de4d..fee13c92c8 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -71,9 +71,9 @@ void Multicaster::mcast(const Event& e) {
}
-void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
+Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
- PollableEventQueue::Queue::iterator i = values.begin();
+ PollableEventQueue::Batch::const_iterator i = values.begin();
while( i != values.end()) {
iovec iov = i->toIovec();
if (!cpg.mcast(&iov, 1)) {
@@ -82,12 +82,13 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
}
++i;
}
- values.erase(values.begin(), i); // Erase sent events.
+ return i;
}
catch (const std::exception& e) {
QPID_LOG(critical, "Multicast error: " << e.what());
queue.stop();
onError();
+ return values.end();
}
}
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index e1014fa499..1566c66a75 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -28,6 +28,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
+#include <deque>
namespace qpid {
@@ -63,7 +64,7 @@ class Multicaster
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
- void sendMcast(PollableEventQueue::Queue& );
+ PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& );
sys::Mutex lock;
boost::function<void()> onError;
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h
index a44c39ad85..db98faa9da 100644
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ b/cpp/src/qpid/cluster/PollableQueue.h
@@ -37,24 +37,27 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
typedef boost::function<void (const T&)> Callback;
typedef boost::function<void()> ErrorCallback;
- PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller)
- : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller),
+ PollableQueue(Callback f, ErrorCallback err, const std::string& msg,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
+ poller),
callback(f), error(err), message(msg) {}
- void handleBatch(typename sys::PollableQueue<T>::Queue& values) {
+ typename sys::PollableQueue<T>::Batch::const_iterator
+ handleBatch(const typename sys::PollableQueue<T>::Batch& values) {
try {
- typename sys::PollableQueue<T>::Queue::iterator i = values.begin();
+ typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin();
while (i != values.end() && !this->isStopped()) {
callback(*i);
++i;
}
- values.erase(values.begin(), i);
+ return i;
}
catch (const std::exception& e) {
QPID_LOG(error, message << ": " << e.what());
- values.clear();
this->stop();
error();
+ return values.end();
}
}
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index f8acf0a5f6..1d390a6eb0 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -28,7 +28,7 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <algorithm>
-#include <deque>
+#include <vector>
namespace qpid {
namespace sys {
@@ -44,16 +44,18 @@ class Poller;
template <class T>
class PollableQueue {
public:
- typedef std::deque<T> Queue;
+ typedef std::vector<T> Batch;
typedef T value_type;
/**
* Callback to process a batch of items from the queue.
*
- * @param values Queue of values to process. Any items remaining
+ * @param batch Queue of values to process. Any items remaining
* on return from Callback are put back on the queue.
+ * @return iterator pointing to the first un-processed item in batch.
+ * Items from this point up to batch.end() are put back on the queue.
*/
- typedef boost::function<void (Queue&)> Callback;
+ typedef boost::function<typename Batch::const_iterator (const Batch& batch)> Callback;
/**
* Constructor; sets necessary parameters.
@@ -99,7 +101,7 @@ class PollableQueue {
mutable sys::Monitor lock;
Callback callback;
PollableCondition condition;
- Queue queue, batch;
+ Batch queue, batch;
Thread dispatcher;
bool stopped;
};
@@ -141,17 +143,18 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) {
}
template <class T> void PollableQueue<T>::process() {
+ // Called with lock held
while (!stopped && !queue.empty()) {
assert(batch.empty());
batch.swap(queue);
+ typename Batch::const_iterator putBack;
{
ScopedUnlock u(lock); // Allow concurrent push to queue.
- callback(batch);
- }
- if (!batch.empty()) {
- queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items.
- batch.clear();
+ putBack = callback(batch);
}
+ // put back unprocessed items.
+ queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
+ batch.clear();
}
}