summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp14
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp18
-rw-r--r--extras/qmf/src/py/qmf/console.py40
3 files changed, 39 insertions, 33 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 0a1c07a232..35011db38e 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -339,8 +339,10 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
headers["qmf.content"] = "_event";
headers["qmf.agent"] = name_address;
- MapCodec::encode(map_, content);
- connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str());
+ Variant::List list;
+ list.push_back(map_);
+ ListCodec::encode(list, content);
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -1165,10 +1167,10 @@ void ManagementAgentImpl::periodicProcessing()
void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map)
{
map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
- map["_values"].asMap()["schema_timestamp"] = uint64_t(schemaTimestamp);
+ map["_values"].asMap()["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+ map["_values"].asMap()["_heartbeat_interval"] = interval;
+ map["_values"].asMap()["_epoch"] = bootSequence;
+ map["_values"].asMap()["_schema_updated"] = uint64_t(schemaTimestamp);
}
void ManagementAgentImpl::ConnectionThread::run()
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 4bacb95c7b..07751f57ef 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -404,8 +404,10 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
string content;
- MapCodec::encode(map_, content);
- sendBufferLH(content, "", headers, "amqp/map", v2Topic, key.str());
+ Variant::List list_;
+ list_.push_back(map_);
+ ListCodec::encode(list_, content);
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
}
@@ -1000,9 +1002,9 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
+ map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
+ map["_values"].asMap()["_heartbeat_interval"] = interval;
+ map["_values"].asMap()["_epoch"] = bootSequence;
string content;
MapCodec::encode(map, content);
@@ -2017,9 +2019,9 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo
headers["qmf.agent"] = name_address;
map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
+ map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
+ map["_values"].asMap()["_heartbeat_interval"] = interval;
+ map["_values"].asMap()["_epoch"] = bootSequence;
string content;
MapCodec::encode(map, content);
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py
index 15e12fbb6b..45a8a61fa0 100644
--- a/extras/qmf/src/py/qmf/console.py
+++ b/extras/qmf/src/py/qmf/console.py
@@ -1206,11 +1206,11 @@ class Session:
try:
agentName = ah["qmf.agent"]
values = content["_values"]
- timestamp = values["timestamp"]
- interval = values["heartbeat_interval"]
+ timestamp = values["_timestamp"]
+ interval = values["_heartbeat_interval"]
epoch = 0
- if 'epoch' in values:
- epoch = values['epoch']
+ if '_epoch' in values:
+ epoch = values['_epoch']
except Exception,e:
return
@@ -1239,7 +1239,7 @@ class Session:
agent.touch()
if self.rcvHeartbeats and self.console and agent:
self._heartbeatCallback(agent, timestamp)
- agent.update_schema_timestamp(values.get("schema_timestamp", 0))
+ agent.update_schema_timestamp(values.get("_schema_updated", 0))
def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
@@ -2573,7 +2573,7 @@ class Broker(Thread):
for agent in to_notify:
self.session._delAgentCallback(agent)
- def _v2SendAgentLocate(self, predicate={}):
+ def _v2SendAgentLocate(self, predicate=[]):
"""
Broadcast an agent-locate request to cause all agents in the domain to tell us who they are.
"""
@@ -2581,13 +2581,13 @@ class Broker(Thread):
dp = self.amqpSession.delivery_properties()
dp.routing_key = "console.request.agent_locate"
mp = self.amqpSession.message_properties()
- mp.content_type = "amqp/map"
+ mp.content_type = "amqp/list"
mp.user_id = self.authUser
mp.app_id = "qmf2"
mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue)
mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
sendCodec = Codec()
- sendCodec.write_map(predicate)
+ sendCodec.write_list(predicate)
msg = Message(dp, mp, sendCodec.encoded)
self._send(msg, "qmf.default.topic")
@@ -2855,7 +2855,7 @@ class Broker(Thread):
content = None
else:
content = None
-
+
if content != None:
##
## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
@@ -3368,6 +3368,9 @@ class Agent:
Handle a QMFv2 data indication from the agent. Note: called from context
of the Broker thread.
"""
+ if content.__class__ != list:
+ return
+
if mp.correlation_id:
try:
self.lock.acquire()
@@ -3384,8 +3387,6 @@ class Agent:
if "qmf.content" in ah:
kind = ah["qmf.content"]
if kind == "_data":
- if content.__class__ != list:
- return
for omap in content:
context.addV2QueryResult(omap)
context.processV2Data()
@@ -3393,14 +3394,15 @@ class Agent:
context.signal()
elif kind == "_event":
- event = Event(self, v2Map=content)
- if event.classKey is None or event.schema:
- # schema optional or present
- context.doEvent(event)
- else:
- # schema not optional and not present
- if context.addPendingEvent(event):
- self._v2SendSchemaRequest(event.classKey)
+ for omap in content:
+ event = Event(self, v2Map=omap)
+ if event.classKey is None or event.schema:
+ # schema optional or present
+ context.doEvent(event)
+ else:
+ # schema not optional and not present
+ if context.addPendingEvent(event):
+ self._v2SendSchemaRequest(event.classKey)
elif kind == "_schema_id":
for sid in content: