summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp24
1 files changed, 13 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 79b76f68be..93625af948 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -68,7 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
handler(&joiningHandler),
joiningHandler(*this),
- memberHandler(*this)
+ memberHandler(*this),
+ mcastId()
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -109,22 +110,22 @@ void Cluster::leave() {
}
void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
- QPID_LOG(trace, "MCAST [" << self << "]: " << body);
AMQFrame f(body);
- Event e(CONTROL, ConnectionId(self, cptr), f.size());
+ Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId);
Buffer buf(e);
f.encode(buf);
+ QPID_LOG(trace, "MCAST " << e << " " << body);
mcastEvent(e);
}
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
- Event e(DATA, connection, size);
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, size_t id) {
+ Event e(DATA, connection, size, id);
memcpy(e.getData(), data, size);
+ QPID_LOG(trace, "MCAST " << e);
mcastEvent(e);
}
void Cluster::mcastEvent(const Event& e) {
- QPID_LOG(trace, "MCAST " << e);
e.mcast(name, cpg);
}
@@ -166,12 +167,13 @@ void Cluster::deliver(
try {
MemberId from(nodeid, pid);
Event e = Event::delivered(from, msg, msg_len);
+
// Process cluster controls immediately
if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
Buffer buf(e);
AMQFrame frame;
while (frame.decode(buf)) {
- QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
+ QPID_LOG(trace, "DLVR " << e << " " << frame);
if (!handler->invoke(e.getConnectionId().getMember(), frame))
throw Exception(QPID_MSG("Invalid cluster control"));
}
@@ -189,17 +191,17 @@ void Cluster::deliver(
void Cluster::connectionEvent(const Event& e) {
Buffer buf(e);
- QPID_LOG(trace, "EXEC: " << e);
boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
assert(connection);
if (e.getType() == DATA) {
+ QPID_LOG(trace, "EXEC: " << e);
connection->deliverBuffer(buf);
}
else { // control
AMQFrame frame;
while (frame.decode(buf)) {
- QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame);
- connection->received(frame);
+ QPID_LOG(trace, "EXEC " << e << " " << frame);
+ connection->delivered(frame);
}
}
}
@@ -351,7 +353,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*
void Cluster::stopClusterNode(void)
{
- // FIXME aconway 2008-09-18:
+ // FIXME aconway 2008-09-18: mgmt
QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}