diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 59 |
1 files changed, 40 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3a0d11b5d1..37b126f5a9 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -66,7 +66,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpgDispatchHandle(cpg, boost::bind(&Cluster::dispatch, this, _1), // read 0, // write - boost::bind(&Cluster::disconnect, this, _1)) // disconnect + boost::bind(&Cluster::disconnect, this, _1) // disconnect + ), + deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2)) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); @@ -80,10 +82,10 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); + deliverQueue.start(poller); } -Cluster::~Cluster() { -} +Cluster::~Cluster() {} // local connection initializes plugins void Cluster::initialize(broker::Connection& c) { @@ -181,31 +183,50 @@ void Cluster::deliver( Buffer buf(static_cast<char*>(msg), msg_len); AMQFrame frame; if (!frame.decode(buf)) // Not enough data. - throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling. - ConnectionInterceptor* connection; + throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling. + void* connection; decodePtr(buf, connection); - QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); - - if (!broker) { - QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); - return; - } - if (connection && from != self) // Look up shadow for remote connections - connection = getShadowConnection(from, connection); - - if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) - handleMethod(from, connection, *frame.getMethod()); - else - connection->deliver(frame); + deliverQueue.push(DeliveredFrame(frame, from, connection)); } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(critical, "Error in cluster delivery: " << e.what()); + QPID_LOG(critical, "Error in cluster deliver: " << e.what()); assert(0); throw; } } +void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin, + const PollableQueue<DeliveredFrame>::iterator& end) +{ + for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) { + AMQFrame& frame(i->frame); + Id from(i->from); + ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection); + try { + QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); + + if (!broker) { + QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); + return; + } + if (connection && from != self) // Look up shadow for remote connections + connection = getShadowConnection(from, connection); + + if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) + handleMethod(from, connection, *frame.getMethod()); + else + connection->deliver(frame); + } + catch (const std::exception& e) { + // FIXME aconway 2008-01-30: exception handling. + QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what()); + assert(0); + throw; + } + } +} + // Handle cluster methods // FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism. void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) { |