diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 4d54a837ca..2b12e4f54a 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -65,12 +65,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect - ) + ), + deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); cpg.join(name); - // Start dispatching from the poller. + + deliverQueue.start(poller); cpgDispatchHandle.startWatch(poller); } @@ -173,27 +175,7 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); - Buffer buf(static_cast<char*>(msg), msg_len); - Connection* connection; - uint8_t type = buf.getOctet(); - decodePtr(buf, connection); - if (connection == 0) { // Cluster controls - AMQFrame frame; - while (frame.decode(buf)) - if (!ClusterOperations(*this, from).invoke(frame)) - throw Exception("Invalid cluster control"); - } - else { // Connection data or control - boost::intrusive_ptr<Connection> c = - getConnection(ConnectionId(from, connection)); - if (type == DATA) - c->deliverBuffer(buf); - else { - AMQFrame frame; - while (frame.decode(buf)) - c->deliver(frame); - } - } + deliverQueue.push(Event::delivered(from, msg, msg_len)); } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -203,6 +185,26 @@ void Cluster::deliver( } } +void Cluster::deliverEvent(const Event& e) { + Buffer buf(e); + if (e.getConnection().getConnectionPtr() == 0) { // Cluster control + AMQFrame frame; + while (frame.decode(buf)) + if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame)) + throw Exception("Invalid cluster control"); + } + else { // Connection data or control + boost::intrusive_ptr<Connection> c = getConnection(e.getConnection()); + if (e.getType() == DATA) + c->deliverBuffer(buf); + else { // control + AMQFrame frame; + while (frame.decode(buf)) + c->deliver(frame); + } + } +} + struct AddrList { const cpg_address* addrs; int count; |