diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 79b76f68be..93625af948 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -68,7 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), handler(&joiningHandler), joiningHandler(*this), - memberHandler(*this) + memberHandler(*this), + mcastId() { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ @@ -109,22 +110,22 @@ void Cluster::leave() { } void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { - QPID_LOG(trace, "MCAST [" << self << "]: " << body); AMQFrame f(body); - Event e(CONTROL, ConnectionId(self, cptr), f.size()); + Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId); Buffer buf(e); f.encode(buf); + QPID_LOG(trace, "MCAST " << e << " " << body); mcastEvent(e); } -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) { - Event e(DATA, connection, size); +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, size_t id) { + Event e(DATA, connection, size, id); memcpy(e.getData(), data, size); + QPID_LOG(trace, "MCAST " << e); mcastEvent(e); } void Cluster::mcastEvent(const Event& e) { - QPID_LOG(trace, "MCAST " << e); e.mcast(name, cpg); } @@ -166,12 +167,13 @@ void Cluster::deliver( try { MemberId from(nodeid, pid); Event e = Event::delivered(from, msg, msg_len); + // Process cluster controls immediately if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control Buffer buf(e); AMQFrame frame; while (frame.decode(buf)) { - QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody()); + QPID_LOG(trace, "DLVR " << e << " " << frame); if (!handler->invoke(e.getConnectionId().getMember(), frame)) throw Exception(QPID_MSG("Invalid cluster control")); } @@ -189,17 +191,17 @@ void Cluster::deliver( void Cluster::connectionEvent(const Event& e) { Buffer buf(e); - QPID_LOG(trace, "EXEC: " << e); boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); assert(connection); if (e.getType() == DATA) { + QPID_LOG(trace, "EXEC: " << e); connection->deliverBuffer(buf); } else { // control AMQFrame frame; while (frame.decode(buf)) { - QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame); - connection->received(frame); + QPID_LOG(trace, "EXEC " << e << " " << frame); + connection->delivered(frame); } } } @@ -351,7 +353,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args* void Cluster::stopClusterNode(void) { - // FIXME aconway 2008-09-18: + // FIXME aconway 2008-09-18: mgmt QPID_LOG(notice, self << " disconnected from cluster " << name.str()); broker.shutdown(); } |