diff options
author | Ted Ross <tross@apache.org> | 2008-11-04 23:01:57 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-11-04 23:01:57 +0000 |
commit | 55f763b930b3a9743d355df95b7fbd126fc35566 (patch) | |
tree | d9888c6c70826bf2f2c44e052db34f1597a11ac6 | |
parent | 0775a64f49729537a207c540b8d8a2973c155d56 (diff) | |
download | qpid-python-55f763b930b3a9743d355df95b7fbd126fc35566.tar.gz |
Added bank numbers to the routing key of a QMF heartbeat message.
This is used by the console to identify which agent sent the indication.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711458 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 2 | ||||
-rw-r--r-- | python/qpid/qmfconsole.py | 25 |
3 files changed, 25 insertions, 8 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index d752312843..1c737155a7 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -22,7 +22,6 @@ #include "qpid/management/ManagementObject.h" #include "ManagementAgentImpl.h" #include <list> -#include <unistd.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> @@ -597,11 +596,12 @@ void ManagementAgentImpl::periodicProcessing() Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); + stringstream key; + key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; contentSize = BUFSIZE - msgBuffer.available(); msgBuffer.reset(); - routingKey = "console.heartbeat"; - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); } moveNewObjectsLH(); diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 596b587401..1b89be18d9 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -351,7 +351,7 @@ void ManagementBroker::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.heartbeat"; + routingKey = "console.heartbeat.1.0"; sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index e173a4b864..55b0a7fd94 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -489,10 +489,21 @@ class Session: if self.console: self.console.methodResponse(broker, seq, result) - def _handleHeartbeatInd(self, broker, codec, seq): + def _handleHeartbeatInd(self, broker, codec, seq, msg): + brokerBank = 1 + agentBank = 0 + dp = msg.get("delivery_properties") + if dp: + key = dp["routing_key"] + keyElements = key.split(".") + if len(keyElements) == 4: + brokerBank = int(keyElements[2]) + agentBank = int(keyElements[3]) + + agent = broker.getAgent(brokerBank, agentBank) timestamp = codec.read_uint64() if self.console != None: - self.console.heartbeat(None, timestamp) + self.console.heartbeat(agent, timestamp) def _handleEventInd(self, broker, codec, seq): if self.console != None: @@ -1086,7 +1097,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.agents = {} - self.agents[0] = Agent(self, "1.0", "BrokerAgent") + self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False @@ -1112,6 +1123,12 @@ class Broker: def getBrokerBank(self): return 1 + def getAgent(self, brokerBank, agentBank): + bankKey = "%d.%d" % (brokerBank, agentBank) + if bankKey in self.agents: + return self.agents[bankKey] + return None + def getSessionId(self): """ Get the identifier of the AMQP session to the broker """ return self.amqpSessionId @@ -1287,7 +1304,7 @@ class Broker: elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) elif opcode == 'e': self.session._handleEventInd (self, codec, seq) elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) |