summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/Event.cpp6
-rw-r--r--cpp/src/qpid/cluster/Event.h6
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp7
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h3
-rw-r--r--cpp/src/qpid/cluster/PollableQueue.h15
6 files changed, 21 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 499d515d73..969d191bd7 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -61,8 +61,6 @@ class Connection :
{
public:
- typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
-
/** Local connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
/** Shadow connection. */
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 1cb010c266..52ea84b02b 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -87,7 +87,7 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
return control(framing::AMQFrame(body), cid);
}
-iovec Event::toIovec() {
+iovec Event::toIovec() const {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
return iov;
@@ -103,8 +103,8 @@ void EventHeader::encode(Buffer& b) const {
}
// Encode my header in my buffer.
-void Event::encodeHeader () {
- Buffer b(getStore(), HEADER_SIZE);
+void Event::encodeHeader () const {
+ Buffer b(const_cast<char*>(getStore()), HEADER_SIZE);
encode(b);
assert(b.getPosition() == HEADER_SIZE);
}
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index f0e445a08c..76ba88a87f 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -54,7 +54,7 @@ class EventHeader {
/** Size of payload data, excluding header. */
size_t getSize() const { return size; }
/** Size of header + payload. */
- size_t getStoreSize() { return size + HEADER_SIZE; }
+ size_t getStoreSize() const { return size + HEADER_SIZE; }
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
@@ -99,10 +99,10 @@ class Event : public EventHeader {
operator framing::Buffer() const;
- iovec toIovec();
+ iovec toIovec() const;
private:
- void encodeHeader();
+ void encodeHeader() const;
RefCountedBuffer::pointer store;
};
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index f72867de4d..fee13c92c8 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -71,9 +71,9 @@ void Multicaster::mcast(const Event& e) {
}
-void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
+Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
- PollableEventQueue::Queue::iterator i = values.begin();
+ PollableEventQueue::Batch::const_iterator i = values.begin();
while( i != values.end()) {
iovec iov = i->toIovec();
if (!cpg.mcast(&iov, 1)) {
@@ -82,12 +82,13 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
}
++i;
}
- values.erase(values.begin(), i); // Erase sent events.
+ return i;
}
catch (const std::exception& e) {
QPID_LOG(critical, "Multicast error: " << e.what());
queue.stop();
onError();
+ return values.end();
}
}
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index e1014fa499..1566c66a75 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -28,6 +28,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
+#include <deque>
namespace qpid {
@@ -63,7 +64,7 @@ class Multicaster
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
- void sendMcast(PollableEventQueue::Queue& );
+ PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& );
sys::Mutex lock;
boost::function<void()> onError;
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h
index a44c39ad85..db98faa9da 100644
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ b/cpp/src/qpid/cluster/PollableQueue.h
@@ -37,24 +37,27 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
typedef boost::function<void (const T&)> Callback;
typedef boost::function<void()> ErrorCallback;
- PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller)
- : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller),
+ PollableQueue(Callback f, ErrorCallback err, const std::string& msg,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
+ poller),
callback(f), error(err), message(msg) {}
- void handleBatch(typename sys::PollableQueue<T>::Queue& values) {
+ typename sys::PollableQueue<T>::Batch::const_iterator
+ handleBatch(const typename sys::PollableQueue<T>::Batch& values) {
try {
- typename sys::PollableQueue<T>::Queue::iterator i = values.begin();
+ typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin();
while (i != values.end() && !this->isStopped()) {
callback(*i);
++i;
}
- values.erase(values.begin(), i);
+ return i;
}
catch (const std::exception& e) {
QPID_LOG(error, message << ": " << e.what());
- values.clear();
this->stop();
error();
+ return values.end();
}
}