From 3d4e75c60e892d2a0adf59b1f6d944eef58b343d Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Fri, 19 Mar 2010 15:14:02 +0000 Subject: Added functioning agent discovery, heartbeat, and age-out to C++ agent and Python console. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@925260 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 103 +++++++------ qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 16 +- qpid/extras/qmf/src/py/qmf/console.py | 185 ++++++++++++++++++++++-- 3 files changed, 236 insertions(+), 68 deletions(-) diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 547d71e165..b21e7105a3 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -242,7 +242,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se headers["qmf.agent"] = name_address; content.encode(); - connThreadBody.sendBuffer(msg.getContent(), 0, + connThreadBody.sendBuffer(msg.getContent(), "", headers, "qmf.default.topic", key.str()); } @@ -264,7 +264,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) methodQueue.pop_front(); { Mutex::ScopedUnlock unlock(agentLock); - invokeMethodRequest(item->body, item->sequence, item->replyTo); + invokeMethodRequest(item->body, item->cid, item->replyTo); delete item; } } @@ -353,8 +353,10 @@ void ManagementAgentImpl::sendHeartbeat() headers["qmf.agent"] = name_address; map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; content.encode(); - connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key); + connThreadBody.sendBuffer(msg.getContent(), "", headers, addr_exchange, addr_key); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } @@ -470,7 +472,7 @@ void ManagementAgentImpl::handleConsoleAddedIndication() QPID_LOG(trace, "RCVD ConsoleAddedInd"); } -void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo) { string methodName; qpid::messaging::Message inMsg(body); @@ -521,10 +523,10 @@ void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t headers["qmf.agent"] = name_address; outMap.encode(); - connThreadBody.sendBuffer(outMsg.getContent(), sequence, headers, "qmf.default.direct", replyTo); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); } -void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, const string& cid, const string& replyTo) { FieldTable ft; FieldTable::ValuePtr value; @@ -565,11 +567,11 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["qmf.agent"] = name_address; content.encode(); - connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } - sendCommandComplete(replyTo, sequence); + //sendCommandComplete(replyTo, sequence); return; } @@ -600,21 +602,44 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["qmf.agent"] = name_address; content.encode(); - connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } } - sendCommandComplete(replyTo, sequence); + //sendCommandComplete(replyTo, sequence); } -void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + static const string addr_exchange("qmf.default.direct"); + + messaging::Message msg; + messaging::MapContent content(msg); + messaging::Variant::Map& map(content.asMap()); + messaging::Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + content.encode(); + connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); +} + +void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo) { if (extThread) { Mutex::ScopedLock lock(agentLock); - methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + methodQueue.push_back(new QueuedMethod(cid, replyTo, body)); if (pipeHandle != 0) { pipeHandle->write("X", 1); } else if (notifyable != 0) { @@ -633,7 +658,7 @@ void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t inCallback = false; } } else { - invokeMethodRequest(body, sequence, replyTo); + invokeMethodRequest(body, cid, replyTo); } QPID_LOG(trace, "RCVD MethodRequest"); @@ -642,32 +667,22 @@ void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t void ManagementAgentImpl::received(Message& msg) { string replyToKey; - framing::MessageProperties p = msg.getMessageProperties(); - if (p.hasReplyTo()) { - const framing::ReplyTo& rt = p.getReplyTo(); + framing::MessageProperties mp = msg.getMessageProperties(); + if (mp.hasReplyTo()) { + const framing::ReplyTo& rt = mp.getReplyTo(); replyToKey = rt.getRoutingKey(); } - if (msg.getHeaders().getAsString("app_id") == "qmf2") + if (mp.hasAppId() && mp.getAppId() == "qmf2") { - uint32_t sequence = 0; - std::string opcode = msg.getHeaders().getAsString("qmf.opcode"); - std::string cid = msg.getMessageProperties().getCorrelationId(); - if (!cid.empty()) { - try { - sequence = boost::lexical_cast(cid); - } catch(const boost::bad_lexical_cast&) { - QPID_LOG(warning, "Bad correlation Id for received QMF request."); - return; - } - } + string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode"); + string cid = msg.getMessageProperties().getCorrelationId(); - if (opcode == "_method_request") { - handleMethodRequest(msg.getData(), sequence, replyToKey); - return; + if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey); + else { + QPID_LOG(trace, "Support for QMF Opcode [" << opcode << "] TBD!!!"); } - - QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); return; } @@ -684,7 +699,6 @@ void ManagementAgentImpl::received(Message& msg) if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); else if (opcode == 'M') QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!"); } @@ -886,17 +900,14 @@ void ManagementAgentImpl::periodicProcessing() content.encode(); const std::string &str = m.getContent(); if (str.length()) { - stringstream key; ::qpid::messaging::Variant::Map headers; - key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << - baseObject->getPackageName() << "." << baseObject->getClassName(); headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - connThreadBody.sendBuffer(str, 0, headers, "qmf.default.topic", key.str(), "amqp/list"); - QPID_LOG(trace, "SENT DataIndication key=" << key.str()); + connThreadBody.sendBuffer(str, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list"); + QPID_LOG(trace, "SENT DataIndication"); } } @@ -936,6 +947,10 @@ void ManagementAgentImpl::ConnectionThread::run() arg::exclusive=true); session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), arg::bindingKey=queueName.str()); + session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(), + arg::bindingKey=agent.name_address); + session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(), + arg::bindingKey="console.#"); subscriptions->subscribe(agent, queueName.str(), dest); QPID_LOG(info, "Connection established with broker"); @@ -1009,7 +1024,7 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, - uint32_t sequence, + const string& cid, const qpid::messaging::VariantMap headers, const string& exchange, const string& routingKey, @@ -1018,11 +1033,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, Message msg; qpid::messaging::VariantMap::const_iterator i; - if (sequence) { - std::stringstream seqstr; - seqstr << sequence; - msg.getMessageProperties().setCorrelationId(seqstr.str()); - } + if (!cid.empty()) + msg.getMessageProperties().setCorrelationId(cid); + if (!contentType.empty()) msg.getMessageProperties().setContentType(contentType); for (i = headers.begin(); i != headers.end(); ++i) { diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index f753395415..0c2d06a6c8 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -123,10 +123,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen }; struct QueuedMethod { - QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) : - sequence(_seq), replyTo(_reply), body(_body) {} + QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) : + cid(_cid), replyTo(_reply), body(_body) {} - uint32_t sequence; + std::string cid; std::string replyTo; std::string body; }; @@ -205,7 +205,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen const std::string& exchange, const std::string& routingKey); void sendBuffer(const std::string& data, - const uint32_t sequence, + const std::string& cid, const qpid::messaging::VariantMap headers, const std::string& exchange, const std::string& routingKey, @@ -263,9 +263,11 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); - void invokeMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo); - void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo); + void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo); + + void handleGetQuery (qpid::framing::Buffer& inBuffer, const std::string& cid, const std::string& replyTo); + void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); + void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); }; diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index b663116f40..226d586f8b 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -352,7 +352,6 @@ class Object(object): raise Exception("Invalid Method (software defect) [%s]" % name) def _encodeUnmanaged(self, codec): - codec.write_uint8(20) codec.write_str8(self._schema.getKey().getPackageName()) codec.write_str8(self._schema.getKey().getClassName()) @@ -482,7 +481,7 @@ class Session: self.brokers.append(broker) if not self.manageConnections: - self.getObjects(broker=broker, _class="agent") + self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0)) return broker def delBroker(self, broker): @@ -873,6 +872,7 @@ class Session: timestamp = codec.read_uint64() if self.console != None and agent != None: self.console.heartbeat(agent, timestamp) + broker._ageAgents() def _handleEventInd(self, broker, codec, seq): if self.console != None: @@ -927,6 +927,39 @@ class Session: if stat: self.console.objectStats(broker, object) + def _v2HandleHeartbeatInd(self, broker, mp, ah, content): + brokerBank = 1 + agentName = ah["qmf.agent"] + values = content["_values"] + timestamp = values["timestamp"] + interval = values["heartbeat_interval"] + if agentName == None: + return + agent = broker.getAgent(brokerBank, agentName) + if agent == None: + agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) + broker._addAgent(agentName, agent) + else: + agent.touch() + if self.console and agent: + self.console.heartbeat(agent, timestamp) + broker._ageAgents() + + def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): + self._v2HandleHeartbeatInd(broker, mp, ah, content) + + def _v2HandleDataInd(self, broker, mp, ah, content): + pass + + def _v2HandleQueryRsp(self, broker, mp, ah, content): + pass + + def _v2HandleMethodRsp(self, broker, mp, ah, content): + pass + + def _v2HandleException(self, broker, mp, ah, content): + pass + def _handleError(self, error): try: self.cv.acquire() @@ -1557,6 +1590,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.cv = Condition() + self.agentLock = Lock() self.error = None self.brokerId = None self.connected = False @@ -1590,8 +1624,12 @@ class Broker: def getAgent(self, brokerBank, agentBank): """ Return the agent object associated with a particular broker and agent bank value.""" bankKey = (brokerBank, agentBank) - if bankKey in self.agents: - return self.agents[bankKey] + try: + self.agentLock.acquire() + if bankKey in self.agents: + return self.agents[bankKey] + finally: + self.agentLock.release() return None def getSessionId(self): @@ -1600,7 +1638,11 @@ class Broker: def getAgents(self): """ Get the list of agents reachable via this broker """ - return self.agents.values() + try: + self.agentLock.acquire() + return self.agents.values() + finally: + self.agentLock.release() def getAmqpSession(self): """ Get the AMQP session object for this connected broker. """ @@ -1629,8 +1671,13 @@ class Broker: def _tryToConnect(self): try: - self.agents = {} - self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") + try: + self.agentLock.acquire() + self.agents = {} + self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") + finally: + self.agentLock.release() + self.topicBound = False self.syncInFlight = False self.syncRequest = 0 @@ -1679,6 +1726,24 @@ class Broker: self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL) + ## + ## Set up connectivity for QMFv2 + ## + self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True) + self.amqpSession.exchange_bind(exchange="qmf.default.direct", + queue=self.v2_queue_name, binding_key=self.v2_queue_name) + self.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=self.v2_queue_name, binding_key="agent.#") + ## Other bindings here... + self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1) + self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL) + self.connected = True self.session._handleBrokerConnect(self) @@ -1686,6 +1751,7 @@ class Broker: self._setHeader(codec, 'B') msg = self._message(codec.encoded) self._send(msg) + self._v2SendAgentLocate() except socket.error, e: self.error = "Socket Error %s - %s" % (e.__class__.__name__, e) @@ -1699,17 +1765,65 @@ class Broker: def _updateAgent(self, obj): bankKey = (obj.brokerBank, obj.agentBank) + agent = None if obj._deleteTime == 0: - if bankKey not in self.agents: - agent = Agent(self, obj.agentBank, obj.label) - self.agents[bankKey] = agent - if self.session.console != None: - self.session.console.newAgent(agent) + try: + self.agentLock.acquire() + if bankKey not in self.agents: + agent = Agent(self, obj.agentBank, obj.label) + self.agents[bankKey] = agent + finally: + self.agentLock.release() + if agent and self.session.console: + self.session.console.newAgent(agent) else: - agent = self.agents.pop(bankKey, None) - if agent != None and self.session.console != None: + try: + self.agentLock.acquire() + agent = self.agents.pop(bankKey, None) + finally: + self.agentLock.release() + if agent and self.session.console: self.session.console.delAgent(agent) + def _addAgent(self, name, agent): + try: + self.agentLock.acquire() + self.agents[(1, name)] = agent + finally: + self.agentLock.release() + if self.session.console: + self.session.console.newAgent(agent) + + def _ageAgents(self): + try: + self.agentLock.acquire() + to_delete = [] + to_notify = [] + for key in self.agents: + if self.agents[key].isOld(): + to_delete.append(key) + for key in to_delete: + to_notify.append(self.agents.pop(key, None)) + finally: + self.agentLock.release() + if self.session.console: + for agent in to_notify: + self.session.console.delAgent(agent) + + def _v2SendAgentLocate(self, predicate={}): + dp = self.amqpSession.delivery_properties() + dp.routing_key = "console.request.agent_locate" + mp = self.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self.authUser + mp.app_id = "qmf2" + mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_agent_locate_request'} + sendCodec = Codec() + sendCodec.write_map(predicate) + msg = Message(dp, mp, sendCodec.encoded) + self._send(msg, "qmf.default.topic") + def _setHeader(self, codec, opcode, seq=0): """ Compose the header of a management message. """ codec.write_uint8(ord('A')) @@ -1819,6 +1933,28 @@ class Broker: self.session.receiver._completed.add(msg.id) self.session.channel.session_completed(self.session.receiver._completed) + def _v2Cb(self, msg): + dp = msg.get("delivery_properties") + mp = msg.get("message_properties") + ah = mp["application_headers"] + opcode = ah["qmf.opcode"] + codec = Codec(msg.body) + + if mp.content_type == "amqp/list": + content = codec.read_list() + elif mp.content_type == "amqp/map": + content = codec.read_map() + else: + return + + if opcode == None: return + elif opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) + elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + elif opcode == '_data_indication': self.session._v2HandleDataInd(self, mp, ah, content) + elif opcode == '_query_response': self.session._v2HandleQueryRsp(self, mp, ah, content) + elif opcode == '_method_response': self.session._v2HandleMethodRsp(self, mp, ah, content) + elif opcode == '_exception': self.session._v2HandleException(self, mp, ah, content) + def _exceptionCb(self, data): self.connected = False self.error = data @@ -1835,14 +1971,31 @@ class Broker: class Agent: """ """ - def __init__(self, broker, agentBank, label): + def __init__(self, broker, agentBank, label, isV2=False, interval=0): self.broker = broker self.brokerBank = broker.getBrokerBank() self.agentBank = agentBank self.label = label + self.isV2 = isV2 + self.heartbeatInterval = interval + self.lastSeenTime = time() + + def touch(self): + self.lastSeenTime = time() + + def isOld(self): + if self.heartbeatInterval == 0: + return None + if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval): + return True + return None def __repr__(self): - return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label) + if self.isV2: + ver = "v2" + else: + ver = "v1" + return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label) def getBroker(self): return self.broker -- cgit v1.2.1