diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 |
1 files changed, 7 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0e1c049a9c..f8adb8ee98 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -20,6 +20,7 @@ #include "Connection.h" #include "DumpClient.h" #include "FailoverExchange.h" +#include "ClusterQueueHandler.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -97,11 +98,12 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b writeEstimate(writeEstimate_), mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller), - deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller), + deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller), + deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + sequence(0) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,6 +197,7 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); + e.setSequence(sequence++); if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e, l); @@ -208,26 +211,6 @@ void Cluster::deliver(const Event& e, Lock&) { } // Entry point: called when deliverEventQueue has events to process. -void Cluster::deliveredEvents(PollableEventQueue::Queue& events) { - try { - for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); - events.clear(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); - leave(); - } -} - -void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) { - try { - for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1)); - frames.clear(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); - leave(); - } -} - void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); Buffer buf(const_cast<char*>(e.getData()), e.getSize()); @@ -243,7 +226,7 @@ void Cluster::deliveredEvent(const Event& e) { if (e.getType() == CONTROL) { AMQFrame frame; while (frame.decode(buf)) { - deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame)); + deliverFrameQueue.push(EventFrame(connection, e, frame)); } } else if (e.getType() == DATA) { |