diff options
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 18 | ||||
-rw-r--r-- | extras/qmf/src/py/qmf/console.py | 40 |
3 files changed, 39 insertions, 33 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 0a1c07a232..35011db38e 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -339,8 +339,10 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se headers["qmf.content"] = "_event"; headers["qmf.agent"] = name_address; - MapCodec::encode(map_, content); - connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str()); + Variant::List list; + list.push_back(map_); + ListCodec::encode(list, content); + connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list"); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -1165,10 +1167,10 @@ void ManagementAgentImpl::periodicProcessing() void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map) { map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - map["_values"].asMap()["schema_timestamp"] = uint64_t(schemaTimestamp); + map["_values"].asMap()["_timestamp"] = uint64_t(Duration(EPOCH, now())); + map["_values"].asMap()["_heartbeat_interval"] = interval; + map["_values"].asMap()["_epoch"] = bootSequence; + map["_values"].asMap()["_schema_updated"] = uint64_t(schemaTimestamp); } void ManagementAgentImpl::ConnectionThread::run() diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 4bacb95c7b..07751f57ef 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -404,8 +404,10 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi string content; - MapCodec::encode(map_, content); - sendBufferLH(content, "", headers, "amqp/map", v2Topic, key.str()); + Variant::List list_; + list_.push_back(map_); + ListCodec::encode(list_, content); + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); } @@ -1000,9 +1002,9 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; + map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); + map["_values"].asMap()["_heartbeat_interval"] = interval; + map["_values"].asMap()["_epoch"] = bootSequence; string content; MapCodec::encode(map, content); @@ -2017,9 +2019,9 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo headers["qmf.agent"] = name_address; map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; + map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); + map["_values"].asMap()["_heartbeat_interval"] = interval; + map["_values"].asMap()["_epoch"] = bootSequence; string content; MapCodec::encode(map, content); diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index 15e12fbb6b..45a8a61fa0 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -1206,11 +1206,11 @@ class Session: try: agentName = ah["qmf.agent"] values = content["_values"] - timestamp = values["timestamp"] - interval = values["heartbeat_interval"] + timestamp = values["_timestamp"] + interval = values["_heartbeat_interval"] epoch = 0 - if 'epoch' in values: - epoch = values['epoch'] + if '_epoch' in values: + epoch = values['_epoch'] except Exception,e: return @@ -1239,7 +1239,7 @@ class Session: agent.touch() if self.rcvHeartbeats and self.console and agent: self._heartbeatCallback(agent, timestamp) - agent.update_schema_timestamp(values.get("schema_timestamp", 0)) + agent.update_schema_timestamp(values.get("_schema_updated", 0)) def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): @@ -2573,7 +2573,7 @@ class Broker(Thread): for agent in to_notify: self.session._delAgentCallback(agent) - def _v2SendAgentLocate(self, predicate={}): + def _v2SendAgentLocate(self, predicate=[]): """ Broadcast an agent-locate request to cause all agents in the domain to tell us who they are. """ @@ -2581,13 +2581,13 @@ class Broker(Thread): dp = self.amqpSession.delivery_properties() dp.routing_key = "console.request.agent_locate" mp = self.amqpSession.message_properties() - mp.content_type = "amqp/map" + mp.content_type = "amqp/list" mp.user_id = self.authUser mp.app_id = "qmf2" mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue) mp.application_headers = {'qmf.opcode':'_agent_locate_request'} sendCodec = Codec() - sendCodec.write_map(predicate) + sendCodec.write_list(predicate) msg = Message(dp, mp, sendCodec.encoded) self._send(msg, "qmf.default.topic") @@ -2855,7 +2855,7 @@ class Broker(Thread): content = None else: content = None - + if content != None: ## ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are @@ -3368,6 +3368,9 @@ class Agent: Handle a QMFv2 data indication from the agent. Note: called from context of the Broker thread. """ + if content.__class__ != list: + return + if mp.correlation_id: try: self.lock.acquire() @@ -3384,8 +3387,6 @@ class Agent: if "qmf.content" in ah: kind = ah["qmf.content"] if kind == "_data": - if content.__class__ != list: - return for omap in content: context.addV2QueryResult(omap) context.processV2Data() @@ -3393,14 +3394,15 @@ class Agent: context.signal() elif kind == "_event": - event = Event(self, v2Map=content) - if event.classKey is None or event.schema: - # schema optional or present - context.doEvent(event) - else: - # schema not optional and not present - if context.addPendingEvent(event): - self._v2SendSchemaRequest(event.classKey) + for omap in content: + event = Event(self, v2Map=omap) + if event.classKey is None or event.schema: + # schema optional or present + context.doEvent(event) + else: + # schema not optional and not present + if context.addPendingEvent(event): + self._v2SendSchemaRequest(event.classKey) elif kind == "_schema_id": for sid in content: |