summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp31
1 files changed, 7 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0e1c049a9c..f8adb8ee98 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -20,6 +20,7 @@
#include "Connection.h"
#include "DumpClient.h"
#include "FailoverExchange.h"
+#include "ClusterQueueHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
@@ -97,11 +98,12 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
writeEstimate(writeEstimate_),
mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
- deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller),
- deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller),
+ deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller),
+ deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ sequence(0)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,6 +197,7 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
+ e.setSequence(sequence++);
if (from == myId) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e, l);
@@ -208,26 +211,6 @@ void Cluster::deliver(const Event& e, Lock&) {
}
// Entry point: called when deliverEventQueue has events to process.
-void Cluster::deliveredEvents(PollableEventQueue::Queue& events) {
- try {
- for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
- events.clear();
- } catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
- leave();
- }
-}
-
-void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) {
- try {
- for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1));
- frames.clear();
- } catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
- leave();
- }
-}
-
void Cluster::deliveredEvent(const Event& e) {
QPID_LATENCY_RECORD("delivered event queue", e);
Buffer buf(const_cast<char*>(e.getData()), e.getSize());
@@ -243,7 +226,7 @@ void Cluster::deliveredEvent(const Event& e) {
if (e.getType() == CONTROL) {
AMQFrame frame;
while (frame.decode(buf)) {
- deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame));
+ deliverFrameQueue.push(EventFrame(connection, e, frame));
}
}
else if (e.getType() == DATA) {