diff options
author | Ted Ross <tross@apache.org> | 2009-12-22 21:46:01 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-12-22 21:46:01 +0000 |
commit | 1b4a13aa9781c3f72b2b53abfbce8528de67ad16 (patch) | |
tree | edf46890480bfa367eaa1714e88d4ab0059aa1b2 | |
parent | 6bd740528732eb8da6f521be49999a37eb5f44ee (diff) | |
download | qpid-python-1b4a13aa9781c3f72b2b53abfbce8528de67ad16.tar.gz |
QPID-2261 - Ken G's patch applied to the qmfv2 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@893326 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qmf/qmfAgent.py | 183 | ||||
-rw-r--r-- | qpid/python/qmf/qmfCommon.py | 532 | ||||
-rw-r--r-- | qpid/python/qmf/qmfConsole.py | 282 | ||||
-rw-r--r-- | qpid/python/qmf/test/agent_test.py | 33 | ||||
-rw-r--r-- | qpid/python/qmf/test/console_test.py | 18 |
5 files changed, 740 insertions, 308 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index f211cdb383..4b76263903 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -18,15 +18,16 @@ # import sys -import socket -import os import logging +import datetime +import time from threading import Thread, Lock -from qpid.messaging import Connection, Message +from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject, - parseSubject, OpCode, Query, SchemaObjectClass, _doQuery) + parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, + QmfData) @@ -36,7 +37,8 @@ from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, class Agent(Thread): def __init__(self, vendor, product, name=None, - notifier=None, kwargs={}): + notifier=None, heartbeat_interval=30, + kwargs={}): Thread.__init__(self) self._running = False self.vendor = vendor @@ -48,10 +50,13 @@ class Agent(Thread): self._id = AgentId(self.vendor, self.product, self.name) self._address = str(self._id) self._notifier = notifier + self._heartbeat_interval = heartbeat_interval self._conn = None + self._session = None self._lock = Lock() - self._data_schema = {} - self._event_schema = {} + self._packages = {} + self._schema_timestamp = long(0) + self._schema = {} self._agent_data = {} def getAgentId(self): @@ -60,8 +65,9 @@ class Agent(Thread): def setConnection(self, conn): self._conn = conn self._session = self._conn.session() - self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE) - self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address) + self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE, capacity=10) + self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address, + capacity=10) self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION) self._running = True self.start() @@ -77,7 +83,16 @@ class Agent(Thread): self._lock.acquire() try: - self._data_schema[schema.getClassId()] = schema + classId = schema.getClassId() + pname = classId.getPackageName() + cname = classId.getClassName() + if pname not in self._packages: + self._packages[pname] = [cname] + else: + if cname not in self._packages[pname]: + self._packages[pname].append(cname) + self._schema[classId] = schema + self._schema_timestamp = long(time.time() * 1000) finally: self._lock.release() @@ -128,35 +143,41 @@ class Agent(Thread): def run(self): - count = 0 # @todo: hack + next_heartbeat = datetime.datetime.utcnow() while self._running: + + now = datetime.datetime.utcnow() + print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) + if now >= next_heartbeat: + self._ind_sender.send(self._makeAgentIndMsg()) + logging.debug("Agent Indication Sent") + next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) + + timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds + print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) try: - msg = self._locate_receiver.fetch(1) - logging.debug("Agent Locate Rcvd: '%s'" % msg) - if msg.content_type == "amqp/map": - self._dispatch(msg, _direct=False) + logging.error("waiting for next rcvr (timeout=%s)..." % timeout) + self._session.next_receiver(timeout = timeout) + except Empty: + pass except KeyboardInterrupt: break - except: + + try: + msg = self._locate_receiver.fetch(timeout = 0) + if msg.content_type == "amqp/map": + self._dispatch(msg, _direct=False) + except Empty: pass try: - msg = self._direct_receiver.fetch(1) - logging.debug("Agent Msg Rcvd: '%s'" % msg) + msg = self._direct_receiver.fetch(timeout = 0) if msg.content_type == "amqp/map": self._dispatch(msg, _direct=True) - except KeyboardInterrupt: - break - except: + except Empty: pass - # @todo: actually implement the periodic agent-ind - # message generation! - count+= 1 - if count == 5: - count = 0 - self._ind_sender.send(self._makeAgentIndMsg()) - logging.debug("Agent Indication Sent") + # # Private: @@ -166,11 +187,11 @@ class Agent(Thread): """ Create an agent indication message identifying this agent """ + _map = self.getAgentId().mapEncode() + _map["schemaTimestamp"] = self._schema_timestamp return Message( subject=makeSubject(OpCode.agent_ind), properties={"method":"response"}, - content={Query._TARGET_AGENT_ID: - self.getAgentId().mapEncode()}) - + content={MsgKey.agent_info: _map}) def _dispatch(self, msg, _direct=False): @@ -195,7 +216,7 @@ class Agent(Thread): if opcode == OpCode.agent_locate: self._handleAgentLocateMsg( msg, cmap, props, version, _direct ) elif opcode == OpCode.get_query: - logging.warning("!!! GET_QUERY TBD !!!") + self._handleQueryMsg( msg, cmap, props, version, _direct ) elif opcode == OpCode.method_req: logging.warning("!!! METHOD_REQ TBD !!!") elif opcode == OpCode.cancel_subscription: @@ -220,17 +241,9 @@ class Agent(Thread): reply = True if "method" in props and props["method"] == "request": - if "query" in cmap: - query = cmap["query"] - # is the query an agent locate? - if Query._TARGET in query and query[Query._TARGET] == {Query._TARGET_AGENT_ID:None}: - if Query._PREDICATE in query: - # does this agent match the predicate? - reply = _doQuery( query[Query._PREDICATE], self.getAgentId().mapEncode() ) - else: - reply = False - logging.debug("Ignoring query - not an agent-id query: '%s'" % query) - reply=True + if MsgKey.query in cmap: + agentIdMap = self.getAgentId().mapEncode() + reply = QmfQuery(cmap[MsgKey.query]).evaluate(QmfData(agentIdMap)) if reply: try: @@ -239,12 +252,92 @@ class Agent(Thread): m.correlation_id = msg.correlation_id tmp_snd.send(m) logging.debug("agent-ind sent to [%s]" % msg.reply_to) - except: - logging.error("Failed to send reply to agent-ind msg '%s'" % msg) + except SendError, e: + logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e))) else: logging.debug("agent-locate msg not mine - no reply sent") + def _handleQueryMsg(self, msg, cmap, props, version, _direct ): + """ + Handle received query message + """ + logging.debug("_handleQueryMsg") + + if "method" in props and props["method"] == "request": + qmap = cmap.get(MsgKey.query) + if qmap: + query = QmfQuery(qmap) + target = query.getTarget() + if target == QmfQuery._TARGET_PACKAGES: + self._queryPackages( msg, query ) + elif target == QmfQuery._TARGET_SCHEMA_ID: + self._querySchemaId( msg, query ) + elif target == QmfQuery._TARGET_SCHEMA: + logging.warning("!!! Query TARGET=SCHEMA TBD !!!") + #self._querySchema( query.getPredicate(), _idOnly=False ) + elif target == QmfQuery._TARGET_AGENT: + logging.warning("!!! Query TARGET=AGENT TBD !!!") + elif target == QmfQuery._TARGET_OBJECT_ID: + logging.warning("!!! Query TARGET=OBJECT_ID TBD !!!") + elif target == QmfQuery._TARGET_OBJECT: + logging.warning("!!! Query TARGET=OBJECT TBD !!!") + else: + logging.warning("Unrecognized query target: '%s'" % str(target)) + + + def _queryPackages(self, msg, query): + """ + Run a query against the list of known packages + """ + pnames = [] + self._lock.acquire() + try: + for name in self._packages.iterkeys(): + if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:name})): + pnames.append(name) + finally: + self._lock.release() + + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content={MsgKey.package_info: pnames} ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + tmp_snd.send(m) + logging.debug("package_info sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + def _querySchemaId( self, msg, query ): + """ + """ + schemas = [] + self._lock.acquire() + try: + for schemaId in self._schema.iterkeys(): + if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:schemaId.getPackageName()})): + schemas.append(schemaId.mapEncode()) + finally: + self._lock.release() + + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content={MsgKey.schema_id: schemas} ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + tmp_snd.send(m) + logging.debug("schema_id sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + ##============================================================================== ## OBJECTS diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py index daa5cea2fd..6e79b1c3f4 100644 --- a/qpid/python/qmf/qmfCommon.py +++ b/qpid/python/qmf/qmfCommon.py @@ -16,9 +16,6 @@ # specific language governing permissions and limitations # under the License. # -import sys -import socket -import os import time import logging from threading import Lock @@ -47,6 +44,13 @@ AMQP_QMF_SUBJECT = "qmf" AMQP_QMF_VERSION = 4 AMQP_QMF_SUBJECT_FMT = "%s%d.%s" +class MsgKey(object): + agent_info = "agent_info" + query = "query" + package_info = "package_info" + schema_id = "schema_id" + + class OpCode(object): noop = "noop" @@ -54,7 +58,7 @@ class OpCode(object): agent_locate = "agent-locate" cancel_subscription = "cancel-subscription" create_subscription = "create-subscription" - get_query = "get_query" + get_query = "get-query" method_req = "method" renew_subscription = "renew-subscription" schema_query = "schema-query" # @todo: deprecate @@ -316,6 +320,9 @@ class QmfData(object): def getProperty(self, _name): return self._properties[_name] + def hasProperty(self, _name): + return _name in self._properties + def setProperty(self, _name, _value): if self._const: raise Exception("cannot modify constant data object") @@ -332,15 +339,11 @@ class QmfData(object): # ignore private data members if _name[0] == '_': return super.__setattr__(self, _name, _value) - # @todo: this is bad - what if the same name is used for a - # property and statistic or argument? if _name in self._properties: return self.setProperty(_name, _value) return super.__setattr__(self, _name, _value) def __getattr__(self, _name): - # @todo: this is bad - what if the same name is used for a - # property and statistic or argument? if _name in self._properties: return self.getProperty(_name) raise AttributeError("no item named '%s' in this object" % _name) @@ -453,6 +456,32 @@ class QmfDescribed(QmfData): return result + def getProperty(self, name): + # meta-properties + if name == QmfQuery._PRED_PACKAGE: + return self._schemaId.getClassId().getPackageName() + if name == QmfQuery._PRED_CLASS: + return self._schemaId.getClassId().getClassName() + if name == QmfQuery._PRED_TYPE: + return self._schemaId.getClassId().getType() + if name == QmfQuery._PRED_HASH: + return self._schemaId.getClassId().getHashString() + if name == QmfQuery._PRED_SCHEMA_ID: + return self._schemaId.getClassId() + if name == QmfQuery._PRED_PRIMARY_KEY: + return self.getPrimaryKey() + + return super(QmfDescribed, self).getProperty(name) + + + def hasProperty(self, name): + if name in [QmfQuery._PRED_PACKAGE, QmfQuery._PRED_CLASS, QmfQuery._PRED_TYPE, + QmfQuery._PRED_HASH, QmfQuery._PRED_SCHEMA_ID, QmfQuery._PRED_PRIMARY_KEY]: + return True + + return super(QmfDescribed, self).hasProperty(name) + + def mapEncode(self): _map = {} _map["schema_id"] = self._schemaId.mapEncode() @@ -553,6 +582,29 @@ class QmfManaged(QmfDescribed): def isDeleted(self): return self._timestamps[QmfManaged._ts_delete] == 0 + + def getProperty(self, name): + # meta-properties + if name == QmfQuery._PRED_OBJECT_ID: + return self.getObjectId() + if name == QmfQuery._PRED_UPDATE_TS: + return self._timestamps[QmfManaged._ts_update] + if name == QmfQuery._PRED_CREATE_TS: + return self._timestamps[QmfManaged._ts_create] + if name == QmfQuery._PRED_DELETE_TS: + return self._timestamps[QmfManaged._ts_delete] + + return super(QmfManaged, self).getProperty(name) + + + def hasProperty(self, name): + if name in [QmfQuery._PRED_OBJECT_ID, QmfQuery._PRED_UPDATE_TS, + QmfQuery._PRED_CREATE_TS, QmfQuery._PRED_DELETE_TS]: + return True + + return super(QmfManaged, self).hasProperty(name) + + def mapEncode(self): _map = super(QmfManaged, self).mapEncode() _map["agent_id"] = self._agentId.mapEncode() @@ -999,110 +1051,110 @@ class MethodResponse(object): -def _doQuery(predicate, params ): - """ - Given the predicate from a query, and a map of named parameters, apply the predicate - to the parameters, and return True or False. - """ - if type(predicate) != list or len(predicate) < 1: - return False +# def _doQuery(predicate, params ): +# """ +# Given the predicate from a query, and a map of named parameters, apply the predicate +# to the parameters, and return True or False. +# """ +# if type(predicate) != list or len(predicate) < 1: +# return False - opr = predicate[0] - if opr == Query._CMP_TRUE: - logging.info("_doQuery() TRUE: [%s]" % predicate ) - return True - elif opr == Query._CMP_FALSE: - logging.info("_doQuery() FALSE: [%s]" % predicate ) - return False - elif opr == Query._LOGIC_AND: - logging.info("_doQuery() AND: [%s]" % predicate ) - rc = False - for exp in predicate[1:]: - rc = _doQuery( exp, params ) - if not rc: - break - return rc - - elif opr == Query._LOGIC_OR: - logging.info("_doQuery() OR: [%s]" % predicate ) - rc = False - for exp in predicate[1:]: - rc = _doQuery( exp, params ) - if rc: - break - return rc - - elif opr == Query._LOGIC_NOT: - logging.info("_doQuery() NOT: [%s]" % predicate ) - if len(predicate) != 2: - logging.warning("Malformed query not-expression received: '%s'" % predicate) - return False - return not _doQuery( predicate[1:], params ) +# elif opr == Query._LOGIC_AND: +# logging.debug("_doQuery() AND: [%s]" % predicate ) +# rc = False +# for exp in predicate[1:]: +# rc = _doQuery( exp, params ) +# if not rc: +# break +# return rc - elif opr in [Query._CMP_EQ, Query._CMP_NE, Query._CMP_LT, - Query._CMP_LE, Query._CMP_GT, Query._CMP_GE, - Query._CMP_RE]: - if len(predicate) != 3: - logging.warning("Malformed query compare expression received: '%s'" % predicate) - return False - # @todo: support regular expression match - name = predicate[1] - if name not in params: - logging.warning("Malformed query, attribute '%s' not present." % name) - return False - arg1 = params[name] - arg1Type = type(arg1) - logging.info("_doQuery() CMP: [%s] value='%s'" % (predicate, arg1) ) - try: - arg2 = arg1Type(predicate[2]) - if opr == Query._CMP_EQ: return arg1 == arg2 - if opr == Query._CMP_NE: return arg1 != arg2 - if opr == Query._CMP_LT: return arg1 < arg2 - if opr == Query._CMP_LE: return arg1 <= arg2 - if opr == Query._CMP_GT: return arg1 > arg2 - if opr == Query._CMP_GE: return arg1 >= arg2 - if opr == Query._CMP_RE: - logging.error("!!! RE QUERY TBD !!!") - return False - except: - logging.warning("Malformed query, unable to compare '%s'" % predicate) - return False +# elif opr == Query._LOGIC_OR: +# logging.debug("_doQuery() OR: [%s]" % predicate ) +# rc = False +# for exp in predicate[1:]: +# rc = _doQuery( exp, params ) +# if rc: +# break +# return rc + +# elif opr == Query._LOGIC_NOT: +# logging.debug("_doQuery() NOT: [%s]" % predicate ) +# if len(predicate) != 2: +# logging.warning("Malformed query not-expression received: '%s'" % predicate) +# return False +# return not _doQuery( predicate[1:], params ) - elif opr == Query._CMP_PRESENT: - logging.info("_doQuery() PRESENT: [%s]" % predicate ) - if len(predicate) != 2: - logging.warning("Malformed query present expression received: '%s'" % predicate) - return False - name = predicate[1] - return name in params - else: - logging.warning("Unknown query operator received: '%s'" % opr) - return False +# else: +# logging.warning("Unknown query operator received: '%s'" % opr) +# return False -class Query: + +class QmfQuery(object): + _TARGET="what" _PREDICATE="where" - _TARGET_PACKAGES="_packages" - _TARGET_OBJECT_ID="_object_id" - _TARGET_SCHEMA="_schema" - _TARGET_SCHEMA_ID="_schema_id" - _TARGET_MGT_DATA="_mgt_data" - _TARGET_AGENT_ID="_agent_id" + #### Query Targets #### + _TARGET_PACKAGES="schema_package" + # (returns just package names) + # predicate key(s): + # + #_PRED_PACKAGE + + + _TARGET_SCHEMA_ID="schema_id" + _TARGET_SCHEMA="schema" + # predicate key(s): + # + #_PRED_PACKAGE + #_PRED_CLASS + #_PRED_TYPE + #_PRED_HASH + #_PRED_SCHEMA_ID + # name of property (exist test only) + # name of method (exist test only) + + + _TARGET_AGENT="agent" + # predicate keys(s): + # + #_PRED_VENDOR="_vendor" + #_PRED_PRODUCT="_product" + #_PRED_NAME="_name" + + _TARGET_OBJECT_ID="object_id" + _TARGET_OBJECT="object" + # package and class names must be suppled in the target value: + # predicate on all values or meta-values[tbd] + # + #_PRED_PACKAGE + #_PRED_CLASS + #_PRED_TYPE + #_PRED_HASH + #_primary_key + #_PRED_SCHEMA_ID + #_PRED_OBJECT_ID + #_PRED_UPDATE_TS + #_PRED_CREATE_TS + #_PRED_DELETE_TS + #<name of property> _PRED_PACKAGE="_package_name" _PRED_CLASS="_class_name" _PRED_TYPE="_type" - _PRED_HASH="_has_str" - _PRED_SCHEMA_ID="_schema_id" + _PRED_HASH="_hash_str" _PRED_VENDOR="_vendor" _PRED_PRODUCT="_product" _PRED_NAME="_name" - _PRED_AGENT_ID="_agent_id" _PRED_PRIMARY_KEY="_primary_key" + _PRED_SCHEMA_ID="_schema_id" + _PRED_OBJECT_ID="_object_id" + _PRED_UPDATE_TS="_update_ts" + _PRED_CREATE_TS="_create_ts" + _PRED_DELETE_TS="_delete_ts" _CMP_EQ="eq" _CMP_NE="ne" @@ -1110,8 +1162,8 @@ class Query: _CMP_LE="le" _CMP_GT="gt" _CMP_GE="ge" - _CMP_RE="re_match" - _CMP_PRESENT="exists" + _CMP_RE_MATCH="re_match" + _CMP_EXISTS="exists" _CMP_TRUE="true" _CMP_FALSE="false" @@ -1119,34 +1171,219 @@ class Query: _LOGIC_OR="or" _LOGIC_NOT="not" - def __init__(self, kwargs={}): - pass -# if "impl" in kwargs: -# self.impl = kwargs["impl"] -# else: -# package = '' -# if "key" in kwargs: -# # construct using SchemaClassKey: -# self.impl = qmfengine.Query(kwargs["key"]) -# elif "object_id" in kwargs: -# self.impl = qmfengine.Query(kwargs["object_id"].impl) -# else: -# if "package" in kwargs: -# package = kwargs["package"] -# if "class" in kwargs: -# self.impl = qmfengine.Query(kwargs["class"], package) -# else: -# raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']") + _valid_targets = [_TARGET_PACKAGES, _TARGET_OBJECT_ID, _TARGET_SCHEMA, _TARGET_SCHEMA_ID, + _TARGET_OBJECT, _TARGET_AGENT] + def __init__(self, qmap): + """ + """ + self._target_map = None + self._predicate = None + + if type(qmap) != dict: + raise TypeError("constructor must be of type dict") + + if self._TARGET in qmap: + self._target_map = qmap[self._TARGET] + if self._PREDICATE in qmap: + self.setPredicate(qmap[self._PREDICATE]) + return + else: + # assume qmap to be the target map + self._target_map = qmap[:] + + + def setPredicate(self, predicate): + """ + """ + if isinstance(predicate, QmfQueryPredicate): + self._predicate = predicate + elif type(predicate) == dict: + self._predicate = QmfQueryPredicate(predicate) + else: + raise TypeError("Invalid type for a predicate: %s" % str(predicate)) + + + def evaluate(self, qmfData): + """ + """ + # @todo: how to interpred qmfData against target?????? + # + if self._predicate: + return self._predicate.evaluate(qmfData) + # no predicate - always match + return True + + def getTarget(self): + for key in self._target_map.iterkeys(): + if key in self._valid_targets: + return key + return None + + def getPredicate(self): + return self._predicate + + def mapEncode(self): + _map = {} + _map[self._TARGET] = self._target_map + _map[self._PREDICATE] = self._predicate.mapEncode() + return _map + + def __repr__(self): + return str(self.mapEncode()) + + + +class QmfQueryPredicate(object): + """ + Class for Query predicates. + """ + _valid_cmp_ops = [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT, + QmfQuery._CMP_GT, QmfQuery._CMP_LE, QmfQuery._CMP_GE, + QmfQuery._CMP_EXISTS, QmfQuery._CMP_RE_MATCH, + QmfQuery._CMP_TRUE, QmfQuery._CMP_FALSE] + _valid_logic_ops = [QmfQuery._LOGIC_AND, QmfQuery._LOGIC_OR, QmfQuery._LOGIC_NOT] + + + def __init__( self, pmap): + """ + {"op": listOf(operands)} + """ + self._oper = None + self._operands = [] + + logic_op = False + if type(pmap) == dict: + for key in pmap.iterkeys(): + logging.error("key = %s" % key) + if key in self._valid_cmp_ops: + # coparison operation - may have "name" and "value" + self._oper = key + break + if key in self._valid_logic_ops: + logic_op = True + self._oper = key + break + + if not self._oper: + raise TypeError("invalid predicate expression: '%s'" % str(pmap)) + + if type(pmap[self._oper]) == list or type(pmap[self._oper]) == tuple: + if logic_op: + for exp in pmap[self._oper]: + self.append(QmfQueryPredicate(exp)) + else: + self._operands = list(pmap[self._oper]) + + else: + raise TypeError("invalid predicate: '%s'" % str(pmap)) + + + def append(self, operand): + """ + Append another operand to a predicate expression + """ + logging.error("Appending: '%s'" % str(operand)) + self._operands.append(operand) -# def package_name(self): return self.impl.getPackage() -# def class_name(self): return self.impl.getClass() -# def object_id(self): -# _objid = self.impl.getObjectId() -# if _objid: -# return ObjectId(_objid) -# else: -# return None + + + def evaluate( self, qmfData ): + """ + """ + if not isinstance(qmfData, QmfData): + raise TypeError("Query expects to evaluate QmfData types.") + + if self._oper == QmfQuery._CMP_TRUE: + logging.debug("query evaluate TRUE") + return True + if self._oper == QmfQuery._CMP_FALSE: + logging.debug("query evaluate FALSE") + return False + + if self._oper in [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT, + QmfQuery._CMP_LE, QmfQuery._CMP_GT, QmfQuery._CMP_GE, + QmfQuery._CMP_RE_MATCH]: + if len(self._operands) != 2: + logging.warning("Malformed query compare expression received: '%s, %s'" % + (self._oper, str(self._operands))) + return False + # @todo: support regular expression match + name = self._operands[0] + logging.debug("looking for: '%s'" % str(name)) + if not qmfData.hasProperty(name): + logging.warning("Malformed query, attribute '%s' not present." % name) + return False + arg1 = qmfData.getProperty(name) + arg1Type = type(arg1) + logging.debug("query evaluate %s: '%s' '%s' '%s'" % + (name, str(arg1), self._oper, str(self._operands[1]))) + try: + arg2 = arg1Type(self._operands[1]) + if self._oper == QmfQuery._CMP_EQ: return arg1 == arg2 + if self._oper == QmfQuery._CMP_NE: return arg1 != arg2 + if self._oper == QmfQuery._CMP_LT: return arg1 < arg2 + if self._oper == QmfQuery._CMP_LE: return arg1 <= arg2 + if self._oper == QmfQuery._CMP_GT: return arg1 > arg2 + if self._oper == QmfQuery._CMP_GE: return arg1 >= arg2 + if self._oper == QmfQuery._CMP_RE_MATCH: + logging.error("!!! RE QUERY TBD !!!") + return False + except: + pass + logging.warning("Malformed query - %s: '%s' '%s' '%s'" % + (name, str(arg1), self._oper, str(self._operands[1]))) + return False + + + if self._oper == QmfQuery._CMP_EXISTS: + if len(self._operands) != 1: + logging.warning("Malformed query present expression received") + return False + name = self._operands[0] + logging.debug("query evaluate PRESENT: [%s]" % str(name)) + return qmfData.hasProperty(name) + + if self._oper == QmfQuery._LOGIC_AND: + logging.debug("query evaluate AND: '%s'" % str(self._operands)) + for exp in self._operands: + if not exp.evaluate(qmfData): + return False + return True + + if self._oper == QmfQuery._LOGIC_OR: + logging.debug("query evaluate OR: [%s]" % str(self._operands)) + for exp in self._operands: + if exp.evaluate(qmfData): + return True + return False + + if self._oper == QmfQuery._LOGIC_NOT: + logging.debug("query evaluate NOT: [%s]" % str(self._operands)) + for exp in self._operands: + if exp.evaluate(qmfData): + return False + return True + + logging.warning("Unrecognized query operator: [%s]" % str(self._oper)) + return False + + + def mapEncode(self): + _map = {} + _list = [] + for exp in self._operands: + if isinstance(exp, QmfQueryPredicate): + _list.append(exp.mapEncode()) + else: + _list.append(exp) + _map[self._oper] = _list + return _map + + + def __repr__(self): + return str(self.mapEncode()) + ##============================================================================== @@ -1696,7 +1933,7 @@ def SchemaMethodFactory( param ): -class SchemaClass(object): +class SchemaClass(QmfData): """ Base class for Data and Event Schema classes. """ @@ -1767,6 +2004,33 @@ class SchemaClass(object): return hstr + def getPropertyCount(self): return len(self._properties) + def getProperties(self): return self._properties.copy() + def addProperty(self, name, prop): + self._properties[name] = prop + # need to re-generate schema hash + self._classId = None + + def getProperty(self, name): + # check for meta-properties first + if name == QmfQuery._PRED_PACKAGE: + return self._pname + if name == QmfQuery._PRED_CLASS: + return self._cname + if name == QmfQuery._PRED_TYPE: + return self._type + if name == QmfQuery._PRED_HASH: + return self.getClassId().getHashString() + if name == QmfQuery._PRED_SCHEMA_ID: + return self.getClassId() + return super(SchemaClass, self).getProperty(name) + + def hasProperty(self, name): + if name in [QmfQuery._PRED_PACKAGE, QmfQuery._PRED_CLASS, + QmfQuery._PRED_TYPE, QmfQuery._PRED_HASH, QmfQuery._PRED_SCHEMA_ID]: + return True + super(SchemaClass, self).hasProperty(name) + def mapEncode(self): """ Return the map encoding of this schema. @@ -1838,19 +2102,9 @@ class SchemaObjectClass(SchemaClass): def getPrimaryKeyList(self): return self._pkeyNames[:] - def getPropertyCount(self): return len(self._properties) - def getProperties(self): return self._properties.copy() - def getProperty(self, name): return self._properties[name] - def getMethodCount(self): return len(self._methods) def getMethods(self): return self._methods.copy() def getMethod(self, name): return self._methods[name] - - def addProperty(self, name, prop): - self._properties[name] = prop - # need to re-generate schema hash - self._classId = None - def addMethod(self, name, method): self._methods[name] = method self._classId = None @@ -1916,18 +2170,6 @@ class SchemaEventClass(SchemaClass): _hash ) self._properties = _props.copy() - def getPropertyCount(self): return len(self._properties) - def getProperties(self): return self._properties.copy() - def getProperty(self, name): return self._properties[name] - def addProperty(self, name, prop): - self._properties[name] = prop - # need to re-generate schema hash - self._classId = None - - def mapEncode(self): - _map = super(SchemaEventClass, self).mapEncode() - return _map - def SchemaEventClassFactory( param ): diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py index e956b07dab..cc8c284579 100644 --- a/qpid/python/qmf/qmfConsole.py +++ b/qpid/python/qmf/qmfConsole.py @@ -21,6 +21,7 @@ import os import logging import platform import time +import datetime import Queue from threading import Thread from threading import Lock @@ -31,7 +32,8 @@ from qpid.messaging import * from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION, AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode, - Query, AgentIdFactory, Notifier, _doQuery) + QmfQuery, AgentIdFactory, Notifier, QmfQueryPredicate, MsgKey, + QmfData) @@ -61,6 +63,7 @@ class _Mailbox(object): self._msgs.append(obj) # if was empty, notify waiters if len(self._msgs) == 1: + logging.error("Delivering @ %s" % time.time()) self._cv.notify() finally: self._cv.release() @@ -114,6 +117,7 @@ class SequencedWaiter(object): self.lock.acquire() try: if seq in self.pending: + logging.error("Putting seq %d @ %s" % (seq,time.time())) self.pending[seq].deliver(new_data) else: logging.error( "seq %d not found!" % seq ) @@ -211,7 +215,7 @@ class ObjectProxy(object): """ if not self._agent: raise Exception("No Agent associated with this object") - newer = self._agent.get_object(Query({"object_id":None}), timeout) + newer = self._agent.get_object(QmfQuery({"object_id":None}), timeout) if newer == None: logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) @@ -247,10 +251,9 @@ class Agent(object): self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id) self._console = console self._sender = None - self._packages = [] # list of package names known to this agent - self._classes = {} # dict [key:class] of classes known to this agent + self._packages = {} # map of {package-name:[list of class-names], } for this agent self._subscriptions = [] # list of active standing subscriptions for this agent - self._announce_timestamp = long(0) # timestamp when last announce received + self._announce_timestamp = None # datetime when last announce received logging.debug( "Created Agent with address: [%s]" % self._address ) @@ -258,31 +261,33 @@ class Agent(object): return self._id def isActive(self): - return self._announce_timestamp != 0 + return self._announce_timestamp != None - def _send_msg(self, msg): + def _sendMsg(self, msg, correlation_id=None): """ Low-level routine to asynchronously send a message to this agent. """ msg.reply_to = self._console.address() - handle = self._console._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") - msg.correlation_id = str(handle) + # handle = self._console._req_correlation.allocate() + # if handle == 0: + # raise Exception("Can not allocate a correlation id!") + # msg.correlation_id = str(handle) + if correlation_id: + msg.correlation_id = str(correlation_id) self._sender.send(msg) - return handle + # return handle def get_packages(self): """ Return a list of the names of all packages known to this agent. """ - return self._packages[:] + return self._packages.keys() def get_classes(self): """ Return a dictionary [key:class] of classes known to this agent. """ - return self._classes[:] + return self._packages.copy() def get_objects(self, query, kwargs={}): """ @@ -329,6 +334,13 @@ class Agent(object): def __str__(self): return self.__repr__() + def _sendQuery(self, query, correlation_id=None): + """ + """ + msg = Message(subject=makeSubject(OpCode.get_query), + properties={"method":"request"}, + content={MsgKey.query: query.mapEncode()}) + self._sendMsg( msg, correlation_id ) ##============================================================================== @@ -377,7 +389,11 @@ class Console(Thread): """ A Console manages communications to a collection of agents on behalf of an application. """ - def __init__(self, name=None, notifier=None, kwargs={}): + def __init__(self, name=None, notifier=None, + reply_timeout = 60, + # agent_timeout = 120, + agent_timeout = 60, + kwargs={}): """ @type name: str @param name: identifier for this console. Must be unique. @@ -392,9 +408,9 @@ class Console(Thread): self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name self._notifier = notifier + self._lock = Lock() self._conn = None self._session = None - self._lock = Lock() # dict of "agent-direct-address":class Agent entries self._agent_map = {} self._direct_recvr = None @@ -403,8 +419,10 @@ class Console(Thread): self._schema_cache = {} self._req_correlation = SequencedWaiter() self._operational = False - self._agent_discovery_predicate = None - self._default_timeout = 60 + self._agent_discovery_filter = None + self._reply_timeout = reply_timeout + self._agent_timeout = agent_timeout + self._next_agent_expire = None # lock out run() thread self._cv = Condition() # for passing WorkItems to the application @@ -431,12 +449,12 @@ class Console(Thread): """ logging.debug("Destroying Console...") if self._conn: - self.remove_connection(self._conn, timeout) + self.removeConnection(self._conn, timeout) logging.debug("Console Destroyed") - def add_connection(self, conn): + def addConnection(self, conn): """ Add a AMQP connection to the console. The console will setup a session over the connection. The console will then broadcast an Agent Locate Indication over @@ -449,8 +467,9 @@ class Console(Thread): raise Exception( "Multiple connections per Console not supported." ); self._conn = conn self._session = conn.session(name=self._name) - self._direct_recvr = self._session.receiver(self._address) - self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION) + self._direct_recvr = self._session.receiver(self._address, capacity=1) + self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION, + capacity=1) self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE) # # Now that receivers are created, fire off the receive thread... @@ -460,7 +479,7 @@ class Console(Thread): - def remove_connection(self, conn, timeout=None): + def removeConnection(self, conn, timeout=None): """ Remove an AMQP connection from the console. Un-does the add_connection() operation, and releases any agents and sessions associated with the connection. @@ -489,40 +508,19 @@ class Console(Thread): self._direct_recvr.close() self._announce_recvr.close() self._locate_sender.close() + self._session.close() self._session = None self._conn = None logging.debug("console connection removal complete") - def address(self): + def getAddress(self): """ The AMQP address this Console is listening to. """ return self._address - def _createAgent( self, agent_id ): - """ - Factory to create/retrieve an agent for this console - """ - if not isinstance(agent_id, AgentId): - raise TypeError("parameter must be an instance of class AgentId") - - self._lock.acquire() - try: - if agent_id in self._agent_map: - return self._agent_map[agent_id] - - agent = Agent(agent_id, self) - agent._sender = self._session.sender(agent._address) - self._agent_map[agent_id] = agent - finally: - self._lock.release() - - return agent - - - def destroyAgent( self, agent ): """ Undoes create. @@ -562,16 +560,18 @@ class Console(Thread): raise Exception("Can not allocate a correlation id!") try: tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)) + query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, + QmfQuery._PREDICATE: + {QmfQuery._LOGIC_AND: + [{QmfQuery._CMP_EQ: ["vendor", agent_id.vendor()]}, + {QmfQuery._CMP_EQ: ["product", agent_id.product()]}, + {QmfQuery._CMP_EQ: ["name", agent_id.name()]}]}}) msg = Message(subject=makeSubject(OpCode.agent_locate), properties={"method":"request"}, - content={"query": {Query._TARGET: {Query._TARGET_AGENT_ID:None}, - Query._PREDICATE: - [Query._LOGIC_AND, - [Query._CMP_EQ, "vendor", agent_id.vendor()], - [Query._CMP_EQ, "product", agent_id.product()], - [Query._CMP_EQ, "name", agent_id.name()]]}}) + content={MsgKey.query: query.mapEncode()}) msg.reply_to = self._address msg.correlation_id = str(handle) + logging.debug("Sending Agent Locate (%s)" % time.time()) tmp_sender.send( msg ) except SendError, e: logging.error(str(e)) @@ -579,11 +579,13 @@ class Console(Thread): return None if not timeout: - timeout = self._default_timeout + timeout = self._reply_timeout new_agent = None + logging.debug("Waiting for response to Agent Locate (%s)" % timeout) self._req_correlation.get_data( handle, timeout ) self._req_correlation.release(handle) + logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: if agent_id in self._agent_map: @@ -605,25 +607,20 @@ class Console(Thread): try: msg = self._announce_recvr.fetch(timeout = 0) if msg: + logging.error( "Announce Msg Rcvd@%s: [%s]" % (time.time(), msg) ) self._dispatch(msg, _direct=False) - except: + except Empty: pass try: msg = self._direct_recvr.fetch(timeout = 0) if msg: + logging.error( "Direct Msg Rcvd@%s: [%s]" % (time.time(), msg) ) self._dispatch(msg, _direct=True) - except: + except Empty: pass - # try: - # logging.error("waiting for next rcvr...") - # rcvr = self._session.next_receiver() - # except: - # logging.error("exception during next_receiver()") - - # logging.error("rcvr=%s" % str(rcvr)) - + self._expireAgents() # check for expired agents if qLen == 0 and self._work_q.qsize() and self._notifier: # work queue went non-empty, kick @@ -634,10 +631,18 @@ class Console(Thread): self._notifier.indication() _callback_thread = None - while self._operational and \ - self._announce_recvr.pending() == 0 and \ - self._direct_recvr.pending(): - time.sleep(0.5) + if self._operational: + # wait for a message to arrive or an agent + # to expire + now = datetime.datetime.utcnow() + if self._next_agent_expire > now: + timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds + try: + logging.error("waiting for next rcvr (timeout=%s)..." % timeout) + self._session.next_receiver(timeout = timeout) + except Empty: + pass + logging.debug("Shutting down Console thread") @@ -649,7 +654,6 @@ class Console(Thread): """ PRIVATE: Process a message received from an Agent """ - logging.error( "Message received from Agent! [%s]" % msg ) try: version,opcode = parseSubject(msg.subject) # @todo: deal with version mismatch!!! @@ -688,13 +692,13 @@ class Console(Thread): Process a received agent-ind message. This message may be a response to a agent-locate, or it can be an unsolicited agent announce. """ - logging.debug("_handleAgentIndMsg '%s'" % msg) + logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time())) - if Query._TARGET_AGENT_ID in cmap: + if MsgKey.agent_info in cmap: try: - agent_id = AgentIdFactory(cmap[Query._TARGET_AGENT_ID]) + agent_id = AgentIdFactory(cmap[MsgKey.agent_info]) except: - logging.debug("Bad agent-ind message received: '%s'" % msg) + logging.warning("Bad agent-ind message received: '%s'" % msg) return ignore = True @@ -702,12 +706,10 @@ class Console(Thread): correlated = False if msg.correlation_id: correlated = self._req_correlation.isValid(msg.correlation_id) - if direct and correlated: ignore = False - elif self._agent_discovery_predicate: - matched = _doQuery( self._agent_discovery_predicate, - agent_id.mapEncode() ) + elif self._agent_discovery_filter: + matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode())) ignore = not matched if not ignore: @@ -723,46 +725,109 @@ class Console(Thread): # need to create and add a new agent agent = self._createAgent(agent_id) - old_timestamp = agent._announce_timestamp - agent._announce_timestamp = time.time() + # lock out expiration scanning code + self._lock.acquire() + try: + old_timestamp = agent._announce_timestamp + agent._announce_timestamp = datetime.datetime.utcnow() + finally: + self._lock.release() - if old_timestamp == 0 and matched: - logging.debug("AGENT_ADDED for %s" % agent) + if old_timestamp == None and matched: + logging.error("AGENT_ADDED for %s (%s)" % (agent, time.time())) wi = WorkItem(WorkItem.AGENT_ADDED, {"agent": agent}) self._work_q.put(wi) if correlated: # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + logging.error("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) + + def _expireAgents(self): + """ + Check for expired agents and issue notifications when they expire. + """ + now = datetime.datetime.utcnow() + if self._next_agent_expire and now < self._next_agent_expire: + return + lifetime_delta = datetime.timedelta(seconds = self._agent_timeout) + next_expire_delta = lifetime_delta + self._lock.acquire() + try: + logging.debug("!!! expiring agents '%s'" % now) + for agent in self._agent_map.itervalues(): + if agent._announce_timestamp: + agent_deathtime = agent._announce_timestamp + lifetime_delta + if agent_deathtime <= now: + logging.debug("AGENT_DELETED for %s" % agent) + agent._announce_timestamp = None + wi = WorkItem(WorkItem.AGENT_DELETED, {"agent":agent}) + self._work_q.put(wi) + else: + if (agent_deathtime - now) < next_expire_delta: + next_expire_delta = agent_deathtime - now + + self._next_agent_expire = now + next_expire_delta + logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) + finally: + self._lock.release() + + + + def _createAgent( self, agent_id ): + """ + Factory to create/retrieve an agent for this console + """ + if not isinstance(agent_id, AgentId): + raise TypeError("parameter must be an instance of class AgentId") + + self._lock.acquire() + try: + if agent_id in self._agent_map: + return self._agent_map[agent_id] + + agent = Agent(agent_id, self) + agent._sender = self._session.sender(agent._address) + self._agent_map[agent_id] = agent + finally: + self._lock.release() + + # new agent - query for its schema database for + # seeding the schema cache (@todo) + # query = QmfQuery({QmfQuery._TARGET_SCHEMA_ID:None}) + # agent._sendQuery( query ) + + return agent + + + def enableAgentDiscovery(self, query=None): """ Called to enable the asynchronous Agent Discovery process. Once enabled, AGENT_ADD work items can arrive on the WorkQueue. """ - self._agent_discovery_predicate = [Query._CMP_TRUE] # default: match all indications if query: - if not isinstance(query, dict): - raise TypeError("parameter must be of type dict") - if Query._TARGET not in query or query[Query._TARGET] != {Query._TARGET_AGENT_ID:None}: - raise TypeError("query must be for an agent '%s'" % query) - if Query._PREDICATE in query: - self._agent_discovery_predicate = query[Query._PREDICATE][:] - + if not isinstance(query, QmfQuery): + raise TypeError("Type QmfQuery expected") + self._agent_discovery_filter = query + else: + # create a match-all agent query (no predicate) + self._agent_discovery_filter = QmfQuery({QmfQuery._TARGET: + {QmfQuery._TARGET_AGENT:None}}) def disableAgentDiscovery(self): """ Called to disable the async Agent Discovery process enabled by calling enableAgentDiscovery() """ - self._agent_discovery_predicate = None + self._agent_discovery_filter = None - def get_workitem_count(self): + def getWorkItemCount(self): """ Returns the count of pending WorkItems that can be retrieved. """ @@ -770,7 +835,7 @@ class Console(Thread): - def get_next_workitem(self, timeout=None): + def getNextWorkItem(self, timeout=None): """ Returns the next pending work item, or None if none available. @todo: subclass and return an Empty event instead. @@ -782,7 +847,7 @@ class Console(Thread): return wi - def release_workitem(self, wi): + def releaseWorkItem(self, wi): """ Return a WorkItem to the Console when it is no longer needed. @todo: call Queue.task_done() - only 2.5+ @@ -793,7 +858,6 @@ class Console(Thread): pass - # def get_packages(self): # plist = [] # for i in range(self.impl.packageCount()): @@ -1172,7 +1236,7 @@ class Console(Thread): if __name__ == '__main__': # temp test code from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory, - SchemaObjectClassFactory, ObjectIdFactory, QmfData, QmfDescribed, + SchemaObjectClassFactory, ObjectIdFactory, QmfDescribed, QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, QmfEvent) logging.getLogger().setLevel(logging.INFO) @@ -1184,7 +1248,7 @@ if __name__ == '__main__': logging.info( "Starting Console" ) _myConsole = Console() - _myConsole.add_connection( _c ) + _myConsole.addConnection( _c ) logging.info( "Finding Agent" ) _myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" ), 5 ) @@ -1192,7 +1256,7 @@ if __name__ == '__main__': logging.info( "Agent Found: %s" % _myAgent ) logging.info( "Removing connection" ) - _myConsole.remove_connection( _c, 10 ) + _myConsole.removeConnection( _c, 10 ) logging.info( "Destroying console:" ) _myConsole.destroy( 10 ) @@ -1211,7 +1275,7 @@ if __name__ == '__main__': _noteMe = MyNotifier( 666 ) _myConsole = Console(notifier=_noteMe) - _myConsole.add_connection( _c ) + _myConsole.addConnection( _c ) _myConsole.enableAgentDiscovery() logging.info("Waiting...") @@ -1225,15 +1289,15 @@ if __name__ == '__main__': break - print("Work available = %d items!" % _myConsole.get_workitem_count()) - _wi = _myConsole.get_next_workitem(timeout=0) + print("Work available = %d items!" % _myConsole.getWorkItemCount()) + _wi = _myConsole.getNextWorkItem(timeout=0) while _wi: print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) - _wi = _myConsole.get_next_workitem(timeout=0) + _wi = _myConsole.getNextWorkItem(timeout=0) logging.info( "Removing connection" ) - _myConsole.remove_connection( _c, 10 ) + _myConsole.removeConnection( _c, 10 ) logging.info( "Destroying console:" ) _myConsole.destroy( 10 ) @@ -1446,3 +1510,17 @@ if __name__ == '__main__': print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.mapEncode()) + logging.info( "******** Messing around with Queries ********" ) + + _q1 = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, + QmfQuery._PREDICATE: + {QmfQuery._LOGIC_AND: + [{QmfQuery._CMP_EQ: ["vendor", "AVendor"]}, + {QmfQuery._CMP_EQ: ["product", "SomeProduct"]}, + {QmfQuery._CMP_EQ: ["name", "Thingy"]}, + {QmfQuery._LOGIC_OR: + [{QmfQuery._CMP_LE: ["temperature", -10]}, + {QmfQuery._CMP_FALSE: None}, + {QmfQuery._CMP_EXISTS: ["namey"]}]}]}}) + + print("_q1.mapEncode() = [%s]" % _q1) diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py index cd1644bca2..d413358dd8 100644 --- a/qpid/python/qmf/test/agent_test.py +++ b/qpid/python/qmf/test/agent_test.py @@ -43,14 +43,23 @@ _schema.addProperty( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) _schema.addProperty( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) +# these two properties are statistics +_schema.addProperty( "query_count", + SchemaProperty(qmfTypes.TYPE_UINT32)) +_schema.addProperty( "method_call_count", + SchemaProperty(qmfTypes.TYPE_UINT32)) +# These two properties can be set via the method call +_schema.addProperty( "set_string", + SchemaProperty(qmfTypes.TYPE_LSTR)) +_schema.addProperty( "set_int", + SchemaProperty(qmfTypes.TYPE_UINT32)) -# add method -_meth = SchemaMethod( _desc="A test method" ) -_meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) ) -_meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) ) -_meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) ) -_schema.addMethod( "meth_1", _meth ) +# add method +_meth = SchemaMethod( _desc="Method to set string and int in object." ) +_meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) +_meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) +_schema.addMethod( "set_meth", _meth ) # Add schema to Agent @@ -61,11 +70,19 @@ _agent.registerObjectClass(_schema) _obj = QmfAgentData( _agent, _schema ) _obj.setProperty("index1", 100) _obj.setProperty("index2", "a name" ) - +_obj.setProperty("set_string", "UNSET") +_obj.setProperty("set_int", 0) +_obj.setProperty("query_count", 0) +_obj.setProperty("method_call_count", 0) _agent.addObject( _obj ) + _agent.addObject( QmfAgentData( _agent, _schema, _props={"index1":99, - "index2": "another name"} )) + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) ## Now connect to the broker diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py index 03b4a98808..e649b2b8e4 100644 --- a/qpid/python/qmf/test/console_test.py +++ b/qpid/python/qmf/test/console_test.py @@ -4,7 +4,7 @@ from threading import Semaphore from qpid.messaging import * -from qmfCommon import (Notifier, Query) +from qmfCommon import (Notifier, QmfQuery) from qmfConsole import Console @@ -31,17 +31,19 @@ logging.info( "Starting Console" ) _notifier = ExampleNotifier() _myConsole = Console(notifier=_notifier) -_myConsole.add_connection( _c ) +_myConsole.addConnection( _c ) # Discover only agents from vendor "redhat.com" that # are a "qmf" product.... # @todo: replace "manual" query construction with # a formal class-based Query API -_query = {Query._TARGET: {Query._TARGET_AGENT_ID:None}, - Query._PREDICATE: - [Query._LOGIC_AND, - [Query._CMP_EQ, "vendor", "redhat.com"], - [Query._CMP_EQ, "product", "qmf"]]} +_query = {QmfQuery._TARGET: + {QmfQuery._TARGET_AGENT:None}, + QmfQuery._PREDICATE: + {QmfQuery._LOGIC_AND: + [{QmfQuery._CMP_EQ: ["vendor", "redhat.com"]}, + {QmfQuery._CMP_EQ: ["product", "qmf"]}]}} +_query = QmfQuery(_query) _myConsole.enableAgentDiscovery(_query) @@ -59,7 +61,7 @@ while not _done: _done = True logging.info( "Removing connection" ) -_myConsole.remove_connection( _c, 10 ) +_myConsole.removeConnection( _c, 10 ) logging.info( "Destroying console:" ) _myConsole.destroy( 10 ) |