summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-30 12:18:21 +0000
committerTed Ross <tross@apache.org>2010-03-30 12:18:21 +0000
commit555230c7a5fb9d5c9af0978f5400613e2533e68c (patch)
tree81e64e343cfddfbf4e9814a8052f3269e4254909
parent1ad3a5955e812965f50b4220fe7e857a64225055 (diff)
downloadqpid-python-555230c7a5fb9d5c9af0978f5400613e2533e68c.tar.gz
Methods (both styles) and Session-scope get queries now working for both V1 and V2.
Almost all of the tests pass. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@929104 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/schema.py2
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.cpp2
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp33
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp5
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py399
5 files changed, 306 insertions, 135 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index 30d7ba872d..27216db8e1 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1402,7 +1402,7 @@ class SchemaClass:
stream.write (" status = coreObject->ManagementMethod (METHOD_" +\
method.getName().upper() + ", ioArgs, text);\n")
- stream.write (" outMap[\"_status_code\"] = (status);\n")
+ stream.write (" outMap[\"_status_code\"] = (uint32_t) status;\n")
stream.write (" outMap[\"_status_text\"] = ::qpid::management::Manageable::StatusText(status, text);\n")
for arg in method.args:
if arg.getDir () == "O" or arg.getDir () == "IO":
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
index d1c7b0620c..62b10446dd 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
@@ -333,6 +333,6 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMapMethodArgs*/)
std::string text;
/*MGEN:Class.MapMethodHandlers*/
- outMap["_status_code"] = status;
+ outMap["_status_code"] = (uint32_t) status;
outMap["_status_text"] = Manageable::StatusText(status, text);
}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 77e591dd2e..298a549651 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -459,19 +459,21 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
qpid::messaging::Message outMsg;
qpid::messaging::MapContent outMap(outMsg);
+ outMap["_values"] = Variant::Map();
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end()) {
- (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID;
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
failed = true;
} else {
string methodName;
ObjectId objId;
Variant::Map inArgs;
+ Variant::Map callMap;
try {
- // coversions will throw if input is invalid.
+ // conversions will throw if input is invalid.
objId = ObjectId(oid->second.asMap());
methodName = mid->second.getString();
@@ -482,17 +484,29 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
- (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT;
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
failed = true;
} else {
+ iter->second->doMethod(methodName, inArgs, callMap);
+ }
- iter->second->doMethod(methodName, inArgs, outMap.asMap());
+ if (callMap["_status_code"].asUint32() == 0) {
+ outMap["_arguments"] = Variant::Map();
+ for (Variant::Map::const_iterator iter = callMap.begin();
+ iter != callMap.end(); iter++)
+ if (iter->first != "_status_code" && iter->first != "_status_text")
+ outMap["_arguments"].asMap()[iter->first] = iter->second;
+ } else {
+ (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"];
+ (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"];
+ failed = true;
}
- } catch(exception& e) {
+ } catch(messaging::InvalidConversion& e) {
outMap.clear();
- (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ outMap["_values"] = Variant::Map();
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION;
(outMap["_values"].asMap())["_status_text"] = e.what();
failed = true;
}
@@ -501,10 +515,13 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
Variant::Map headers;
headers["method"] = "response";
headers["qmf.agent"] = name_address;
- if (failed)
+ if (failed) {
headers["qmf.opcode"] = "_exception";
- else
+ QPID_LOG(trace, "SENT Exception map=" << outMap);
+ } else {
headers["qmf.opcode"] = "_method_response";
+ QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
+ }
outMap.encode();
connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 183763a417..ee40ba9594 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -188,11 +188,6 @@ void ObjectId::setV2Key(const ManagementObject& object)
// encode as V2-format map
void ObjectId::mapEncode(messaging::VariantMap& map) const
{
- if (agent == 0)
- map["_first"] = first;
- else
- map["_first"] = (first | agent->first);
-
map["_object_name"] = v2Key;
if (!agentName.empty())
map["_agent_name"] = agentName;
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 526eb49d24..e598068262 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -287,7 +287,7 @@ class Object(object):
if name == prop.name:
return value
if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references
- deref = self._session.getObjects(_objectId=value, _broker=self._broker)
+ deref = self._agent.getObjects(_objectId=value)
if len(deref) != 1:
return None
else:
@@ -321,10 +321,6 @@ class Object(object):
aIdx = 0
sendCodec = Codec()
seq = self._session.seqMgr._reserve((method, synchronous))
- self._broker._setHeader(sendCodec, 'M', seq)
- self._objectId.encode(sendCodec)
- self._schema.getKey().encode(sendCodec)
- sendCodec.write_str8(name)
count = 0
for arg in method.arguments:
@@ -333,24 +329,64 @@ class Object(object):
if count != len(args):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._session._encodeValue(sendCodec, args[aIdx], arg.type)
- aIdx += 1
- if timeWait:
- ttl = timeWait * 1000
+ if self._agent.isV2:
+ #
+ # Compose and send a QMFv2 method request
+ #
+ call = {}
+ call['_object_id'] = self._objectId.asMap()
+ call['_method_name'] = name
+ argMap = {}
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ argMap[arg.name] = args[aIdx]
+ aIdx += 1
+ call['_arguments'] = argMap
+
+ dp = self._broker.amqpSession.delivery_properties()
+ dp.routing_key = self._objectId.getAgentBank()
+ mp = self._broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self._broker.authUser
+ mp.correlation_id = str(seq)
+ mp.app_id = "qmf2"
+ mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_method_request'}
+ sendCodec.write_map(call)
+ smsg = Message(dp, mp, sendCodec.encoded)
+ exchange = "qmf.default.direct"
+
else:
- ttl = None
- smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
- ttl=ttl)
+ #
+ # Associate this sequence with the agent hosting the object so we can correctly
+ # route the method-response
+ #
+ agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank())
+ self._broker._setSequence(seq, agent)
+
+ #
+ # Compose and send a QMFv1 method request
+ #
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ self._schema.getKey().encode(sendCodec)
+ sendCodec.write_str8(name)
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
+ aIdx += 1
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+ exchange = "qpid.management"
+
if synchronous:
try:
self._broker.cv.acquire()
self._broker.syncInFlight = True
finally:
self._broker.cv.release()
- self._broker._send(smsg)
+ self._broker._send(smsg, exchange)
return seq
return None
@@ -551,7 +587,9 @@ class Session:
self.brokers.append(broker)
if not self.manageConnections:
- self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0))
+ agent = broker.getAgent(1,0)
+ if agent:
+ agent.getObjects(_class="agent")
return broker
@@ -712,76 +750,17 @@ class Session:
if len(agentList) == 0:
return []
- pname = None
- cname = None
- hash = None
- classKey = None
- if "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
- elif "_key" in kwargs: classKey = kwargs["_key"]
- elif "_class" in kwargs:
- cname = kwargs["_class"]
- if "_package" in kwargs:
- pname = kwargs["_package"]
- if cname == None and classKey == None and "_objectId" not in kwargs:
- raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
-
- map = {}
- self.getSelect = []
- if "_objectId" in kwargs:
- map["_objectid"] = kwargs["_objectId"].__repr__()
- else:
- if cname == None:
- cname = classKey.getClassName()
- pname = classKey.getPackageName()
- hash = classKey.getHash()
- map["_class"] = cname
- if pname != None: map["_package"] = pname
- if hash != None: map["_hash"] = hash
- for item in kwargs:
- if item[0] != '_':
- self.getSelect.append((item, kwargs[item]))
-
- self.getResult = []
+ #
+ # We now have a list of agents to query, start the queries and gather the results.
+ #
+ request = SessionGetRequest(len(agentList))
for agent in agentList:
- broker = agent.broker
- sendCodec = Codec()
- try:
- self.cv.acquire()
- seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
- self.syncSequenceList.append(seq)
- finally:
- self.cv.release()
- broker._setHeader(sendCodec, 'G', seq)
- sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
- broker._send(smsg)
-
- starttime = time()
- timeout = False
- if "_timeout" in kwargs:
- waitTime = kwargs["_timeout"]
- else:
- waitTime = self.DEFAULT_GET_WAIT_TIME
- try:
- self.cv.acquire()
- while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(waitTime)
- if time() - starttime > waitTime:
- for pendingSeq in self.syncSequenceList:
- self.seqMgr._release(pendingSeq)
- self.syncSequenceList = []
- timeout = True
- finally:
- self.cv.release()
-
- if self.error:
- errorText = self.error
- self.error = None
- raise Exception(errorText)
-
- if len(self.getResult) == 0 and timeout:
- raise RuntimeError("No agent responded within timeout period")
- return self.getResult
+ agent.getObjects(request, **kwargs)
+ timeout = 60
+ if '_timeout' in kwargs:
+ timeout = kwargs['_timeout']
+ request.wait(timeout)
+ return request.result
def setEventFilter(self, **kwargs):
@@ -1179,14 +1158,10 @@ class Session:
schema = self.getSchema(schemaKey)
for method in schema.getMethods():
if name == method.name:
- aIdx = 0
- sendCodec = Codec()
- seq = self.seqMgr._reserve((method, False))
- broker._setHeader(sendCodec, 'M', seq)
- objectId.encode(sendCodec)
- schemaKey.encode(sendCodec)
- sendCodec.write_str8(name)
-
+ #
+ # Count the arguments supplied and validate that the number is what is expected
+ # based on the schema.
+ #
count = 0
for arg in method.arguments:
if arg.dir.find("I") != -1:
@@ -1194,18 +1169,106 @@ class Session:
if count != len(argList):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._encodeValue(sendCodec, argList[aIdx], arg.type)
- aIdx += 1
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
- (objectId.getBrokerBank(), objectId.getAgentBank()))
- broker._send(smsg)
+ aIdx = 0
+ sendCodec = Codec()
+ seq = self.seqMgr._reserve((method, False))
+
+ if objectId.isV2():
+ #
+ # Compose and send a QMFv2 method request
+ #
+ call = {}
+ call['_object_id'] = objectId.asMap()
+ call['_method_name'] = name
+ args = {}
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ args[arg.name] = argList[aIdx]
+ aIdx += 1
+ call['_arguments'] = args
+
+ dp = broker.amqpSession.delivery_properties()
+ dp.routing_key = objectId.getAgentBank()
+ mp = broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = broker.authUser
+ mp.correlation_id = str(seq)
+ mp.app_id = "qmf2"
+ mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_method_request'}
+ sendCodec.write_map(call)
+ msg = Message(dp, mp, sendCodec.encoded)
+ broker._send(msg, "qmf.default.direct")
+
+ else:
+ #
+ # Associate this sequence with the agent hosting the object so we can correctly
+ # route the method-response
+ #
+ agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank())
+ broker._setSequence(seq, agent)
+
+ #
+ # Compose and send a QMFv1 method request
+ #
+ broker._setHeader(sendCodec, 'M', seq)
+ objectId.encode(sendCodec)
+ schemaKey.encode(sendCodec)
+ sendCodec.write_str8(name)
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._encodeValue(sendCodec, argList[aIdx], arg.type)
+ aIdx += 1
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (objectId.getBrokerBank(), objectId.getAgentBank()))
+ broker._send(smsg)
return seq
return None
#===================================================================================================
+# SessionGetRequest
+#===================================================================================================
+class SessionGetRequest(object):
+ """
+ This class is used to track get-object queries at the Session level.
+ """
+ def __init__(self, agentCount):
+ self.agentCount = agentCount
+ self.result = []
+ self.cv = Condition()
+ self.waiting = True
+
+ def __call__(self, **kwargs):
+ """
+ Callable entry point for gathering collected objects.
+ """
+ try:
+ self.cv.acquire()
+ if 'qmf_object' in kwargs:
+ self.result.append(kwargs['qmf_object'])
+ elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs:
+ self.agentCount -= 1
+ if self.agentCount == 0:
+ self.waiting = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+
+ def wait(self, timeout):
+ starttime = time()
+ try:
+ self.cv.acquire()
+ while self.waiting:
+ if (time() - starttime) > timeout:
+ raise Exception("Timed out after %d seconds" % timeout)
+ self.cv.wait(1)
+ finally:
+ self.cv.release()
+
+
+#===================================================================================================
# SchemaCache
#===================================================================================================
class SchemaCache(object):
@@ -1572,7 +1635,7 @@ class ObjectId:
else:
first = constructor.read_uint64()
second = constructor.read_uint64()
- self.agentName = str((first & 0x0000FFFFF0000000) >> 28)
+ self.agentName = str((first & 0x000000000FFFFFFF) >> 28)
self.agentEpoch = (first & 0x0FFF000000000000) >> 48
self.objectName = str(second)
@@ -1600,6 +1663,9 @@ class ObjectId:
return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(),
self.getBrokerBank(), self.getAgentBank(), self.getObject())
+ def isV2(self):
+ return not self.agentName.isdigit()
+
def index(self):
return self.__repr__()
@@ -1622,11 +1688,11 @@ class ObjectId:
return self.getSequence() == 0
def encode(self, codec):
- first = self.agentEpoch << 48
+ first = (self.agentEpoch << 48) + (1 << 28)
second = 0
try:
- first += int(self.agentName) << 28
+ first += int(self.agentName)
except:
pass
@@ -1787,7 +1853,7 @@ class Broker:
def getAgent(self, brokerBank, agentBank):
""" Return the agent object associated with a particular broker and agent bank value."""
- bankKey = agentBank
+ bankKey = str(agentBank)
try:
self.cv.acquire()
if bankKey in self.agents:
@@ -1852,7 +1918,7 @@ class Broker:
try:
self.cv.acquire()
self.agents = {}
- self.agents[0] = Agent(self, 0, "BrokerAgent")
+ self.agents['0'] = Agent(self, 0, "BrokerAgent")
finally:
self.cv.release()
@@ -2331,9 +2397,18 @@ class Agent:
raise Exception("notifiable object must be callable")
#
+ # Isolate the selectors from the kwargs
+ #
+ selectors = {}
+ for key in kwargs:
+ value = kwargs[key]
+ if key[0] != '_':
+ selectors[key] = value
+
+ #
# Allocate a context to track this asynchronous request.
#
- context = RequestContext(self, notifiable)
+ context = RequestContext(self, notifiable, selectors)
sequence = self.seqMgr._reserve(context)
try:
self.lock.acquire()
@@ -2419,6 +2494,7 @@ class Agent:
code = codec.read_uint32()
text = codec.read_str16()
outArgs = {}
+ self.broker._clearSequence(seq)
pair = self.seqMgr._release(seq)
if pair == None:
return
@@ -2426,19 +2502,19 @@ class Agent:
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
- outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
+ outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
result = MethodResult(code, text, outArgs)
if synchronous:
try:
- broker.cv.acquire()
- broker.syncResult = result
- broker.syncInFlight = False
- broker.cv.notify()
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
finally:
- broker.cv.release()
+ self.broker.cv.release()
else:
- if self.console:
- self.console.methodResponse(broker, seq, result)
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
def _v1HandleEventInd(self, codec, seq):
@@ -2502,12 +2578,79 @@ class Agent:
context.signal()
- def _v2HandleMethodRsp(self, mp, ah, content):
- pass
+ def _v2HandleMethodResp(self, mp, ah, content):
+ """
+ Handle a QMFv2 method response from the agent
+ """
+ context = None
+ sequence = None
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ seq = int(mp.correlation_id)
+ finally:
+ self.lock.release()
+ else:
+ return
+
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+ result = MethodResult(0, 'OK', content['_arguments'])
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
def _v2HandleException(self, mp, ah, content):
- pass
+ """
+ Handle a QMFv2 exception
+ """
+ context = None
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ seq = int(mp.correlation_id)
+ finally:
+ self.lock.release()
+ else:
+ return
+
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+
+ code = 7
+ text = ""
+ if '_status_code' in content:
+ code = content['_status_code']
+ if '_status_text' in content:
+ text = content['_status_text']
+ else:
+ text = content
+
+ result = MethodResult(code, text, {})
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
def _v1SendGetQuery(self, sequence, kwargs):
@@ -2607,7 +2750,7 @@ class Agent:
"""
if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content)
- elif opcode == '_method_response': self._v2HandleMethodRsp(mp, ah, content)
+ elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content)
elif opcode == '_exception': self._v2HandleException(mp, ah, content)
@@ -2619,11 +2762,12 @@ class RequestContext(object):
This class tracks an asynchronous request sent to an agent.
TODO: Add logic for client-side selection and filtering deleted objects from get-queries
"""
- def __init__(self, agent, notifiable):
+ def __init__(self, agent, notifiable, selectors={}):
self.sequence = None
self.agent = agent
self.schemaCache = self.agent.schemaCache
self.notifiable = notifiable
+ self.selectors = selectors
self.startTime = time()
self.rawQueryResults = []
self.queryResults = []
@@ -2639,6 +2783,16 @@ class RequestContext(object):
def addV1QueryResult(self, data):
+ values = {}
+ for prop, val in data.getProperties():
+ values[prop.name] = val
+ for stat, val in data.getStatistics():
+ values[stat.name] = val
+ for key in values:
+ val = values[key]
+ if key in self.selectors and val != self.selectors[key]:
+ return
+
if self.notifiable:
self.notifiable(qmf_object=data)
else:
@@ -2646,6 +2800,11 @@ class RequestContext(object):
def addV2QueryResult(self, data):
+ values = data['_values']
+ for key in values:
+ val = values[key]
+ if key in self.selectors and val != self.selectors[key]:
+ return
self.rawQueryResults.append(data)