summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp59
1 files changed, 40 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3a0d11b5d1..37b126f5a9 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -66,7 +66,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpgDispatchHandle(cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
- boost::bind(&Cluster::disconnect, this, _1)) // disconnect
+ boost::bind(&Cluster::disconnect, this, _1) // disconnect
+ ),
+ deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
@@ -80,10 +82,10 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
+ deliverQueue.start(poller);
}
-Cluster::~Cluster() {
-}
+Cluster::~Cluster() {}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
@@ -181,31 +183,50 @@ void Cluster::deliver(
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
if (!frame.decode(buf)) // Not enough data.
- throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling.
- ConnectionInterceptor* connection;
+ throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling.
+ void* connection;
decodePtr(buf, connection);
- QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
- if (!broker) {
- QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
- return;
- }
- if (connection && from != self) // Look up shadow for remote connections
- connection = getShadowConnection(from, connection);
-
- if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
- handleMethod(from, connection, *frame.getMethod());
- else
- connection->deliver(frame);
+ deliverQueue.push(DeliveredFrame(frame, from, connection));
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(critical, "Error in cluster delivery: " << e.what());
+ QPID_LOG(critical, "Error in cluster deliver: " << e.what());
assert(0);
throw;
}
}
+void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
+ const PollableQueue<DeliveredFrame>::iterator& end)
+{
+ for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) {
+ AMQFrame& frame(i->frame);
+ Id from(i->from);
+ ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+ try {
+ QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
+
+ if (!broker) {
+ QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+ return;
+ }
+ if (connection && from != self) // Look up shadow for remote connections
+ connection = getShadowConnection(from, connection);
+
+ if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
+ handleMethod(from, connection, *frame.getMethod());
+ else
+ connection->deliver(frame);
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2008-01-30: exception handling.
+ QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what());
+ assert(0);
+ throw;
+ }
+ }
+}
+
// Handle cluster methods
// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {