diff options
author | Ted Ross <tross@apache.org> | 2010-01-15 14:29:41 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-01-15 14:29:41 +0000 |
commit | 9ebba0e9a3c32d9263365140ae7066ff6f3ef38c (patch) | |
tree | d6bba5294eb300a3681b31e5f5eda4b3445e57ed | |
parent | c7e1aca1e90bacbf7ef579ffce686f187243b3c7 (diff) | |
download | qpid-python-9ebba0e9a3c32d9263365140ae7066ff6f3ef38c.tar.gz |
QPID-2261 - Branch patch from Ken Giusti
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@899644 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qmf/qmfAgent.py | 303 | ||||
-rw-r--r-- | qpid/python/qmf/qmfCommon.py | 415 | ||||
-rw-r--r-- | qpid/python/qmf/qmfConsole.py | 468 | ||||
-rw-r--r-- | qpid/python/qmf/test/agent_test.py | 67 | ||||
-rw-r--r-- | qpid/python/qmf/test/console_test.py | 144 |
5 files changed, 995 insertions, 402 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index 278eafedbc..89be722833 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -21,13 +21,56 @@ import sys import logging import datetime import time -from threading import Thread, Lock +import Queue +from threading import Thread, Lock, currentThread from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, makeSubject, parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass) + SchemaClass, SchemaClassId, WorkItem, SchemaMethod) + +# global flag that indicates which thread (if any) is +# running the agent notifier callback +_callback_thread=None + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class _MethodCallHandle(object): + """ + Private class used to hold context when handing off a method call to the + application. Given to the app in a WorkItem, provided to the agent when + method_response() is invoked. + """ + def __init__(self, correlation_id, reply_to, meth_name, _oid=None): + self.correlation_id = correlation_id + self.reply_to = reply_to + self.meth_name = meth_name + self.oid = _oid + +class MethodCallParams(object): + """ + """ + def __init__(self, name, _oid=None, _in_args=None, _user_id=None): + self._meth_name = name + self._oid = _oid + self._in_args = _in_args + self._user_id = _user_id + + def get_name(self): + return self._meth_name + + def get_object_id(self): + return self._oid + + def get_args(self): + return self._in_args + + def get_user_id(self): + return self._user_id + ##============================================================================== @@ -54,6 +97,8 @@ class Agent(Thread): self._schema_timestamp = long(0) self._schema = {} self._agent_data = {} + self._work_q = Queue.Queue() + self._work_q_put = False def get_name(self): return self.name @@ -63,9 +108,9 @@ class Agent(Thread): locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - logging.debug("my direct addr=%s" % my_addr) - logging.debug("agent.locate addr=%s" % locate_addr) - logging.debug("agent.ind addr=%s" % ind_addr) + logging.error("my direct addr=%s" % my_addr) + logging.error("agent.locate addr=%s" % locate_addr) + logging.error("agent.ind addr=%s" % ind_addr) self._conn = conn self._session = self._conn.session() @@ -133,30 +178,60 @@ class Agent(Thread): self._lock.release() - def methodResponse(self, context, status, text, arguments): - logging.error("!!!Agent.methodResponse() TBD!!!") + def method_response(self, handle, _out_args=None, _error=None): + """ + """ + if not isinstance(handle, _MethodCallHandle): + raise TypeError("Invalid handle passed to method_response!") + + _map = {SchemaMethod.KEY_NAME:handle.meth_name} + if handle.oid is not None: + _map[QmfData.KEY_OBJECT_ID] = handle.oid + if _out_args is not None: + _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy() + if _error is not None: + if not isinstance(_error, QmfData): + raise TypeError("Invalid type for error - must be QmfData") + _map[SchemaMethod.KEY_ERROR] = _error.map_encode() + + msg = Message(subject=makeSubject(OpCode.response), + properties={"method":"response"}, + content={MsgKey.method:_map}) + msg.correlation_id = handle.correlation_id + + try: + tmp_snd = self._session.sender( handle.reply_to ) + tmp_snd.send(msg) + logging.debug("method-response sent to [%s]" % handle.reply_to) + except SendError, e: + logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e))) - def getWorkItemCount(self): + def get_workitem_count(self): """ Returns the count of pending WorkItems that can be retrieved. """ - logging.error("!!!Agent.getWorkItemCount() TBD!!!") + return self._work_q.qsize() - def getNextWorkItem(self, timeout=None): + def get_next_workitem(self, timeout=None): """ Obtains the next pending work item, or None if none available. """ - logging.error("!!!Agent.getNextWorkItem() TBD!!!") + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi - def releaseWorkItem(self, wi): + def release_workitem(self, wi): """ Releases a WorkItem instance obtained by getNextWorkItem(). Called when the application has finished processing the WorkItem. """ - logging.error("!!!Agent.releaseWorkItem() TBD!!!") + pass def run(self): + global _callback_thread next_heartbeat = datetime.datetime.utcnow() while self._running: @@ -174,19 +249,29 @@ class Agent(Thread): except Empty: continue - try: - msg = self._locate_receiver.fetch(timeout = 0) - except Empty: - msg = None - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=False) - - try: - msg = self._direct_receiver.fetch(timeout = 0) - except Empty: - msg = None - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=True) + while True: + try: + msg = self._locate_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=False) + + while True: + try: + msg = self._direct_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=True) + + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + logging.info("Calling agent notifier.indication") + self._notifier.indication() + _callback_thread = None # # Private: @@ -227,7 +312,7 @@ class Agent(Thread): elif opcode == OpCode.get_query: self._handleQueryMsg( msg, cmap, props, version, _direct ) elif opcode == OpCode.method_req: - logging.warning("!!! METHOD_REQ TBD !!!") + self._handleMethodReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.cancel_subscription: logging.warning("!!! CANCEL_SUB TBD !!!") elif opcode == OpCode.create_subscription: @@ -253,7 +338,7 @@ class Agent(Thread): query = cmap.get(MsgKey.query) if query is not None: # fake a QmfData containing my identifier for the query compare - tmpData = QmfData(_values={"_name": self.get_name()}) + tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) reply = QmfQuery(query).evaluate(tmpData) if reply: @@ -278,24 +363,53 @@ class Agent(Thread): if "method" in props and props["method"] == "request": qmap = cmap.get(MsgKey.query) if qmap: - query = QmfQuery(qmap) - target = query.getTarget() - if target == QmfQuery._TARGET_PACKAGES: + query = QmfQuery.from_map(qmap) + target = query.get_target() + if target == QmfQuery.TARGET_PACKAGES: self._queryPackages( msg, query ) - elif target == QmfQuery._TARGET_SCHEMA_ID: + elif target == QmfQuery.TARGET_SCHEMA_ID: self._querySchema( msg, query, _idOnly=True ) - elif target == QmfQuery._TARGET_SCHEMA: + elif target == QmfQuery.TARGET_SCHEMA: self._querySchema( msg, query) - elif target == QmfQuery._TARGET_AGENT: - logging.warning("!!! Query TARGET=AGENT TBD !!!") - elif target == QmfQuery._TARGET_OBJECT_ID: - logging.warning("!!! Query TARGET=OBJECT_ID TBD !!!") - elif target == QmfQuery._TARGET_OBJECT: - logging.warning("!!! Query TARGET=OBJECT TBD !!!") + elif target == QmfQuery.TARGET_AGENT: + logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!") + elif target == QmfQuery.TARGET_OBJECT_ID: + self._queryData(msg, query, _idOnly=True) + elif target == QmfQuery.TARGET_OBJECT: + self._queryData(msg, query) else: logging.warning("Unrecognized query target: '%s'" % str(target)) + + def _handleMethodReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Method Request + """ + if "method" in props and props["method"] == "request": + mname = cmap.get(SchemaMethod.KEY_NAME) + if not mname: + logging.warning("Invalid method call from '%s': no name" + % msg.reply_to) + return + + in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) + oid = cmap.get(QmfData.KEY_OBJECT_ID) + + print("!!! ci=%s rt=%s mn=%s oid=%s" % + (msg.correlation_id, + msg.reply_to, + mname, + oid)) + + handle = _MethodCallHandle(msg.correlation_id, + msg.reply_to, + mname, + oid) + param = MethodCallParams( mname, oid, in_args, msg.user_id) + self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) + self._work_q_put = True + def _queryPackages(self, msg, query): """ Run a query against the list of known packages @@ -304,7 +418,7 @@ class Agent(Thread): self._lock.acquire() try: for name in self._packages.iterkeys(): - if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})): + if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})): pnames.append(name) finally: self._lock.release() @@ -326,36 +440,97 @@ class Agent(Thread): """ """ schemas = [] - self._lock.acquire() - try: - for sid,val in self._schema.iteritems(): - if query.evaluate(val): - if _idOnly: - schemas.append(sid.map_encode()) - else: - schemas.append(val.map_encode()) - finally: - self._lock.release() + # if querying for a specific schema, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._schema.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + schemas.append(query.get_id().map_encode()) + else: + schemas.append(found.map_encode()) + else: # otherwise, evaluate all schema + self._lock.acquire() + try: + for sid,val in self._schema.iteritems(): + if query.evaluate(val): + if _idOnly: + schemas.append(sid.map_encode()) + else: + schemas.append(val.map_encode()) + finally: + self._lock.release() - try: - tmp_snd = self._session.sender( msg.reply_to ) - if _idOnly: - content = {MsgKey.schema_id: schemas} - else: - content = {MsgKey.schema:schemas} + tmp_snd = self._session.sender( msg.reply_to ) - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content=content ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id + if _idOnly: + content = {MsgKey.schema_id: schemas} + else: + content = {MsgKey.schema:schemas} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: tmp_snd.send(m) logging.debug("schema_id sent to [%s]" % msg.reply_to) except SendError, e: logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + def _queryData( self, msg, query, _idOnly=False ): + """ + """ + data_objs = [] + # if querying for a specific object, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._agent_data.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + data_objs.append(query.get_id()) + else: + data_objs.append(found.map_encode()) + else: # otherwise, evaluate all data + self._lock.acquire() + try: + for oid,val in self._agent_data.iteritems(): + if query.evaluate(val): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(val.map_encode()) + finally: + self._lock.release() + + tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.object_id:data_objs} + else: + content = {MsgKey.data_obj:data_objs} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: + tmp_snd.send(m) + logging.debug("data reply sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) ##============================================================================== @@ -407,7 +582,7 @@ class QmfAgentData(QmfData): if __name__ == '__main__': # static test cases - no message passing, just exercise API - from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes, + from qmfCommon import (AgentName, SchemaProperty, qmfTypes, SchemaMethod, SchemaEventClass) logging.getLogger().setLevel(logging.INFO) @@ -436,8 +611,8 @@ if __name__ == '__main__': # add method _meth = SchemaMethod(_desc="Method to set string and int in object." ) - _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) _schema.add_method( "set_meth", _meth ) # Add schema to Agent diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py index 9bcb166c6b..580d86b7a7 100644 --- a/qpid/python/qmf/qmfCommon.py +++ b/qpid/python/qmf/qmfCommon.py @@ -49,6 +49,9 @@ class MsgKey(object): package_info = "package_info" schema_id = "schema_id" schema = "schema" + object_id="object_id" + data_obj="object" + method="method" class OpCode(object): @@ -92,6 +95,11 @@ def parseSubject(_sub): return _sub[3:].split('.', 1) +##============================================================================== +## Async Event Model +##============================================================================== + + class Notifier(object): """ Virtual base class that defines a call back which alerts the application that @@ -107,6 +115,48 @@ class Notifier(object): +class WorkItem(object): + """ + Describes an event that has arrived for the application to process. The + Notifier is invoked when one or more of these WorkItems become available + for processing. + """ + # Enumeration of the types of WorkItems produced on the Console + AGENT_ADDED=1 + AGENT_DELETED=2 + NEW_PACKAGE=3 + NEW_CLASS=4 + OBJECT_UPDATE=5 + EVENT_RECEIVED=7 + AGENT_HEARTBEAT=8 + # Enumeration of the types of WorkItems produced on the Agent + METHOD_CALL=1000 + QUERY=1001 + SUBSCRIBE=1002 + UNSUBSCRIBE=1003 + + def __init__(self, kind, handle, _params=None): + """ + Used by the Console to create a work item. + + @type kind: int + @param kind: work item type + """ + self._kind = kind + self._handle = handle + self._params = _params + + def get_type(self): + return self._kind + + def get_handle(self): + return self._handle + + def get_params(self): + return self._params + + + ##============================================================================== ## Addressing ##============================================================================== @@ -155,15 +205,15 @@ class AgentName(object): """ _separator = ":" - def __init__(self, vendor, product, name, str_=None): + def __init__(self, vendor, product, name, _str=None): """ Note: this object must be immutable, as it is used to index into a dictionary """ - if str_: + if _str is not None: # construct from string representation - if _str.count(AgentId._separator) < 2: - raise TypeError("AgentId string format must be 'vendor.product.name'") - self._vendor, self._product, self._name = param.split(AgentId._separator) + if _str.count(AgentName._separator) < 2: + raise TypeError("AgentName string format must be 'vendor.product.name'") + self._vendor, self._product, self._name = _str.split(AgentName._separator) else: self._vendor = vendor self._product = product @@ -299,9 +349,9 @@ class QmfData(_mapEncoder): _object_id=_object_id, _schema=_schema, _const=_const) create = classmethod(_create) - def _from_map(cls, map_, _schema=None, _const=False): + def __from_map(cls, map_, _schema=None, _const=False): return cls(_map=map_, _schema=_schema, _const=_const) - from_map = classmethod(_from_map) + from_map = classmethod(__from_map) def is_managed(self): return self._object_id is not None @@ -671,9 +721,9 @@ class Arguments(object): -class MethodResponse(object): - def __init__(self, impl): - pass +#class MethodResponse(object): +# def __init__(self, impl): +# pass # self.impl = qmfengine.MethodResponse(impl) @@ -756,140 +806,173 @@ class MethodResponse(object): class QmfQuery(_mapEncoder): - _TARGET="what" - _PREDICATE="where" + KEY_TARGET="what" + KEY_PREDICATE="where" + KEY_ID="id" + + ### Query Types + ID=1 + PREDICATE=2 #### Query Targets #### - _TARGET_PACKAGES="schema_package" + TARGET_PACKAGES="schema_package" # (returns just package names) - # predicate key(s): + # allowed predicate key(s): # - #_PRED_PACKAGE + # SchemaClassId.KEY_PACKAGE - - _TARGET_SCHEMA_ID="schema_id" - _TARGET_SCHEMA="schema" - # predicate key(s): + TARGET_SCHEMA_ID="schema_id" + TARGET_SCHEMA="schema" + # allowed predicate key(s): # - #_PRED_PACKAGE - #_PRED_CLASS - #_PRED_TYPE - #_PRED_HASH - #_PRED_SCHEMA_ID + # SchemaClassId.KEY_PACKAGE + # SchemaClassId.KEY_CLASS + # SchemaClassId.KEY_TYPE + # SchemaClassId.KEY_HASH + # SchemaClass.KEY_SCHEMA_ID # name of property (exist test only) # name of method (exist test only) - - _TARGET_AGENT="agent" - # predicate keys(s): + TARGET_AGENT="agent" + # allowed predicate keys(s): # - #_PRED_VENDOR="_vendor" - #_PRED_PRODUCT="_product" - #_PRED_NAME="_name" - - _TARGET_OBJECT_ID="object_id" - _TARGET_OBJECT="object" - # package and class names must be suppled in the target value: - # predicate on all values or meta-values[tbd] + KEY_AGENT_NAME="_name" + + TARGET_OBJECT_ID="object_id" + TARGET_OBJECT="object" + # allowed predicate keys(s): # - #_PRED_PACKAGE - #_PRED_CLASS - #_PRED_TYPE - #_PRED_HASH - #_primary_key - #_PRED_SCHEMA_ID - #_PRED_OBJECT_ID - #_PRED_UPDATE_TS - #_PRED_CREATE_TS - #_PRED_DELETE_TS - #<name of property> - - _PRED_PACKAGE="_package_name" - _PRED_CLASS="_class_name" - _PRED_TYPE="_type" - _PRED_HASH="_hash_str" - _PRED_VENDOR="_vendor" - _PRED_PRODUCT="_product" - _PRED_NAME="_name" - _PRED_PRIMARY_KEY="_primary_key" - _PRED_SCHEMA_ID="_schema_id" - _PRED_OBJECT_ID="_object_id" - _PRED_UPDATE_TS="_update_ts" - _PRED_CREATE_TS="_create_ts" - _PRED_DELETE_TS="_delete_ts" - - _CMP_EQ="eq" - _CMP_NE="ne" - _CMP_LT="lt" - _CMP_LE="le" - _CMP_GT="gt" - _CMP_GE="ge" - _CMP_RE_MATCH="re_match" - _CMP_EXISTS="exists" - _CMP_TRUE="true" - _CMP_FALSE="false" - - _LOGIC_AND="and" - _LOGIC_OR="or" - _LOGIC_NOT="not" - - _valid_targets = [_TARGET_PACKAGES, _TARGET_OBJECT_ID, _TARGET_SCHEMA, _TARGET_SCHEMA_ID, - _TARGET_OBJECT, _TARGET_AGENT] - - def __init__(self, qmap): + # SchemaClassId.KEY_PACKAGE + # SchemaClassId.KEY_CLASS + # SchemaClassId.KEY_TYPE + # SchemaClassId.KEY_HASH + # QmfData.KEY_SCHEMA_ID + # QmfData.KEY_OBJECT_ID + # QmfData.KEY_UPDATE_TS + # QmfData.KEY_CREATE_TS + # QmfData.KEY_DELETE_TS + # <name of data value> + + CMP_EQ="eq" + CMP_NE="ne" + CMP_LT="lt" + CMP_LE="le" + CMP_GT="gt" + CMP_GE="ge" + CMP_RE_MATCH="re_match" + CMP_EXISTS="exists" + CMP_TRUE="true" + CMP_FALSE="false" + + LOGIC_AND="and" + LOGIC_OR="or" + LOGIC_NOT="not" + + _valid_targets = [TARGET_PACKAGES, TARGET_OBJECT_ID, TARGET_SCHEMA, TARGET_SCHEMA_ID, + TARGET_OBJECT, TARGET_AGENT] + + def __init__(self, _target=None, _target_params=None, _predicate=None, + _id=None, _map=None): """ """ - self._target_map = None - self._predicate = None + if _map is not None: + target_map = _map.get(self.KEY_TARGET) + if not target_map: + raise TypeError("QmfQuery requires a target map") + + _target = None + for key in target_map.iterkeys(): + if key in self._valid_targets: + _target = key + break - if type(qmap) != dict: - raise TypeError("constructor must be of type dict") + _target_params = target_map.get(_target) + + _id = _map.get(self.KEY_ID) + if _id is not None: + # Convert identifier to native type if necessary + if _target == self.TARGET_SCHEMA: + _id = SchemaClassId.from_map(_id) + else: + pred = _map.get(self.KEY_PREDICATE) + if pred: + _predicate = QmfQueryPredicate(pred) + + self._target = _target + if not self._target: + raise TypeError("QmfQuery requires a target value") + self._target_params = _target_params + self._predicate = _predicate + self._id = _id + + # constructors + def _create_wildcard(cls, target, _target_params=None): + return cls(_target=target, _target_params=_target_params) + create_wildcard = classmethod(_create_wildcard) + + def _create_predicate(cls, target, predicate, _target_params=None): + return cls(_target=target, _target_params=_target_params, + _predicate=predicate) + create_predicate = classmethod(_create_predicate) + + def _create_id(cls, target, ident, _target_params=None): + return cls(_target=target, _target_params=_target_params, _id=ident) + create_id = classmethod(_create_id) - if self._TARGET in qmap: - self._target_map = qmap[self._TARGET] - if self._PREDICATE in qmap: - self.setPredicate(qmap[self._PREDICATE]) - return + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) + + def get_target(self): + return self._target + + def get_target_param(self): + return self._target_params + + def get_selector(self): + if self._id: + return QmfQuery.ID else: - # assume qmap to be the target map - self._target_map = qmap[:] + return QmfQuery.PREDICATE + def get_id(self): + return self._id - def setPredicate(self, predicate): + def get_predicate(self): """ """ - if isinstance(predicate, QmfQueryPredicate): - self._predicate = predicate - elif type(predicate) == dict: - self._predicate = QmfQueryPredicate(predicate) - else: - raise TypeError("Invalid type for a predicate: %s" % str(predicate)) - + return self._predicate def evaluate(self, qmfData): """ """ - # @todo: how to interpred qmfData against target?????? - # + if self._id: + if self._target == self.TARGET_SCHEMA: + return (qmfData.has_value(qmfData.KEY_SCHEMA_ID) and + qmfData.get_value(qmfData.KEY_SCHEMA_ID) == self._id) + elif self._target == self.TARGET_OBJECT: + return (qmfData.has_value(qmfData.KEY_OBJECT_ID) and + qmfData.get_value(qmfData.KEY_OBJECT_ID) == self._id) + elif self._target == self.TARGET_AGENT: + return (qmfData.has_value(self.KEY_AGENT_NAME) and + qmfData.get_value(self.KEY_AGENT_NAME) == self._id) + + raise Exception("Unsupported query target '%s'" % str(self._target)) + if self._predicate: return self._predicate.evaluate(qmfData) - # no predicate - always match + # no predicate and no id - always match return True - def getTarget(self): - for key in self._target_map.iterkeys(): - if key in self._valid_targets: - return key - return None - - def getPredicate(self): - return self._predicate - def map_encode(self): - _map = {} - _map[self._TARGET] = self._target_map - if self._predicate is not None: - _map[self._PREDICATE] = self._predicate.map_encode() + _map = {self.KEY_TARGET: {self._target: self._target_params}} + if self._id is not None: + if isinstance(self._id, _mapEncoder): + _map[self.KEY_ID] = self._id.map_encode() + else: + _map[self.KEY_ID] = self._id + elif self._predicate is not None: + _map[self.KEY_PREDICATE] = self._predicate.map_encode() return _map def __repr__(self): @@ -901,11 +984,11 @@ class QmfQueryPredicate(_mapEncoder): """ Class for Query predicates. """ - _valid_cmp_ops = [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT, - QmfQuery._CMP_GT, QmfQuery._CMP_LE, QmfQuery._CMP_GE, - QmfQuery._CMP_EXISTS, QmfQuery._CMP_RE_MATCH, - QmfQuery._CMP_TRUE, QmfQuery._CMP_FALSE] - _valid_logic_ops = [QmfQuery._LOGIC_AND, QmfQuery._LOGIC_OR, QmfQuery._LOGIC_NOT] + _valid_cmp_ops = [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, + QmfQuery.CMP_GT, QmfQuery.CMP_LE, QmfQuery.CMP_GE, + QmfQuery.CMP_EXISTS, QmfQuery.CMP_RE_MATCH, + QmfQuery.CMP_TRUE, QmfQuery.CMP_FALSE] + _valid_logic_ops = [QmfQuery.LOGIC_AND, QmfQuery.LOGIC_OR, QmfQuery.LOGIC_NOT] def __init__( self, pmap): @@ -919,7 +1002,7 @@ class QmfQueryPredicate(_mapEncoder): if type(pmap) == dict: for key in pmap.iterkeys(): if key in self._valid_cmp_ops: - # coparison operation - may have "name" and "value" + # comparison operation - may have "name" and "value" self._oper = key break if key in self._valid_logic_ops: @@ -955,16 +1038,16 @@ class QmfQueryPredicate(_mapEncoder): if not isinstance(qmfData, QmfData): raise TypeError("Query expects to evaluate QmfData types.") - if self._oper == QmfQuery._CMP_TRUE: + if self._oper == QmfQuery.CMP_TRUE: logging.debug("query evaluate TRUE") return True - if self._oper == QmfQuery._CMP_FALSE: + if self._oper == QmfQuery.CMP_FALSE: logging.debug("query evaluate FALSE") return False - if self._oper in [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT, - QmfQuery._CMP_LE, QmfQuery._CMP_GT, QmfQuery._CMP_GE, - QmfQuery._CMP_RE_MATCH]: + if self._oper in [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, + QmfQuery.CMP_LE, QmfQuery.CMP_GT, QmfQuery.CMP_GE, + QmfQuery.CMP_RE_MATCH]: if len(self._operands) != 2: logging.warning("Malformed query compare expression received: '%s, %s'" % (self._oper, str(self._operands))) @@ -982,13 +1065,13 @@ class QmfQueryPredicate(_mapEncoder): logging.debug("query evaluate %s: '%s' '%s' '%s'" % (name, str(arg1), self._oper, str(arg2))) try: - if self._oper == QmfQuery._CMP_EQ: return arg1 == arg2 - if self._oper == QmfQuery._CMP_NE: return arg1 != arg2 - if self._oper == QmfQuery._CMP_LT: return arg1 < arg2 - if self._oper == QmfQuery._CMP_LE: return arg1 <= arg2 - if self._oper == QmfQuery._CMP_GT: return arg1 > arg2 - if self._oper == QmfQuery._CMP_GE: return arg1 >= arg2 - if self._oper == QmfQuery._CMP_RE_MATCH: + if self._oper == QmfQuery.CMP_EQ: return arg1 == arg2 + if self._oper == QmfQuery.CMP_NE: return arg1 != arg2 + if self._oper == QmfQuery.CMP_LT: return arg1 < arg2 + if self._oper == QmfQuery.CMP_LE: return arg1 <= arg2 + if self._oper == QmfQuery.CMP_GT: return arg1 > arg2 + if self._oper == QmfQuery.CMP_GE: return arg1 >= arg2 + if self._oper == QmfQuery.CMP_RE_MATCH: logging.error("!!! RE QUERY TBD !!!") return False except: @@ -998,7 +1081,7 @@ class QmfQueryPredicate(_mapEncoder): return False - if self._oper == QmfQuery._CMP_EXISTS: + if self._oper == QmfQuery.CMP_EXISTS: if len(self._operands) != 1: logging.warning("Malformed query present expression received") return False @@ -1006,21 +1089,21 @@ class QmfQueryPredicate(_mapEncoder): logging.debug("query evaluate PRESENT: [%s]" % str(name)) return qmfData.has_value(name) - if self._oper == QmfQuery._LOGIC_AND: + if self._oper == QmfQuery.LOGIC_AND: logging.debug("query evaluate AND: '%s'" % str(self._operands)) for exp in self._operands: if not exp.evaluate(qmfData): return False return True - if self._oper == QmfQuery._LOGIC_OR: + if self._oper == QmfQuery.LOGIC_OR: logging.debug("query evaluate OR: [%s]" % str(self._operands)) for exp in self._operands: if exp.evaluate(qmfData): return True return False - if self._oper == QmfQuery._LOGIC_NOT: + if self._oper == QmfQuery.LOGIC_NOT: logging.debug("query evaluate NOT: [%s]" % str(self._operands)) for exp in self._operands: if exp.evaluate(qmfData): @@ -1129,7 +1212,7 @@ class SchemaClassId(_mapEncoder): KEY_HASH="_hash_str" TYPE_DATA = "_data" - TYPE_EVENT = "event" + TYPE_EVENT = "_event" _valid_types=[TYPE_DATA, TYPE_EVENT] _schemaHashStrFormat = "%08x-%08x-%08x-%08x" @@ -1333,7 +1416,7 @@ class SchemaProperty(_mapEncoder): def getAccess(self): return self._access - def isOptional(self): return self._isOptional + def is_optional(self): return self._isOptional def isIndex(self): return self._isIndex @@ -1351,9 +1434,9 @@ class SchemaProperty(_mapEncoder): def isParentRef(self): return self._isParentRef - def getDirection(self): return self._dir + def get_direction(self): return self._dir - def getDefault(self): return self._default + def get_default(self): return self._default def map_encode(self): """ @@ -1402,7 +1485,11 @@ class SchemaMethod(_mapEncoder): map["arguments"] = map of "name"=<SchemaProperty> pairs. map["desc"] = str, description of the method """ - def __init__(self, args={}, _desc=None, _map=None): + KEY_NAME="_name" + KEY_ARGUMENTS="_arguments" + KEY_DESC="_desc" + KEY_ERROR="_error" + def __init__(self, _args={}, _desc=None, _map=None): """ Construct a SchemaMethod. @@ -1412,14 +1499,16 @@ class SchemaMethod(_mapEncoder): @param _desc: Human-readable description of the schema """ if _map is not None: - _desc = _map.get("desc") - margs = _map.get("arguments", args) - # margs are in map format - covert to SchemaProperty - args = {} - for name,val in margs.iteritems(): - args[name] = SchemaProperty.from_map(val) - - self._arguments = args.copy() + _desc = _map.get(self.KEY_DESC) + margs = _map.get(self.KEY_ARGUMENTS) + if margs: + # margs are in map format - covert to SchemaProperty + tmp_args = {} + for name,val in margs.iteritems(): + tmp_args[name] = SchemaProperty.from_map(val) + _args=tmp_args + + self._arguments = _args.copy() self._desc = _desc # map constructor @@ -1427,13 +1516,13 @@ class SchemaMethod(_mapEncoder): return cls(_map=map_) from_map = classmethod(_from_map) - def getDesc(self): return self._desc + def get_desc(self): return self._desc - def getArgCount(self): return len(self._arguments) + def get_arg_count(self): return len(self._arguments) - def getArguments(self): return self._arguments.copy() + def get_arguments(self): return self._arguments.copy() - def getArgument(self, name): return self._arguments[name] + def get_argument(self, name): return self._arguments.get(name) def add_argument(self, name, schema): """ @@ -1447,6 +1536,9 @@ class SchemaMethod(_mapEncoder): """ if not isinstance(schema, SchemaProperty): raise TypeError("argument must be a SchemaProperty class") + # "Input" argument, by default + if schema._dir is None: + schema._dir = "I" self._arguments[name] = schema def map_encode(self): @@ -1457,8 +1549,8 @@ class SchemaMethod(_mapEncoder): _args = {} for name,val in self._arguments.iteritems(): _args[name] = val.map_encode() - _map["arguments"] = _args - if self._desc: _map["desc"] = self._desc + _map[self.KEY_ARGUMENTS] = _args + if self._desc: _map[self.KEY_DESC] = self._desc return _map def __repr__(self): @@ -1493,7 +1585,6 @@ class SchemaClass(QmfData): map["_schema_id"] = map representation of a SchemaClassId instance map["_primary_key_names"] = order list of primary key names """ - KEY_SCHEMA_ID="_schema_id" KEY_PRIMARY_KEY_NAMES="_primary_key_names" KEY_DESC = "_desc" @@ -1671,6 +1762,11 @@ class SchemaObjectClass(SchemaClass): if self._classId.get_type() != SchemaClassId.TYPE_DATA: raise TypeError("Invalid ClassId type for data schema: %s" % self._classId) + # map constructor + def __from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(__from_map) + def get_id_names(self): return self._object_id_names[:] @@ -1725,6 +1821,11 @@ class SchemaEventClass(SchemaClass): raise TypeError("Invalid ClassId type for event schema: %s" % self._classId) + # map constructor + def __from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(__from_map) + diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py index 027ce163e5..97acdd767e 100644 --- a/qpid/python/qmf/qmfConsole.py +++ b/qpid/python/qmf/qmfConsole.py @@ -28,16 +28,18 @@ from threading import Lock from threading import currentThread from threading import Condition -from qpid.messaging import * +from qpid.messaging import Connection, Message, Empty, SendError from qmfCommon import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, QmfQueryPredicate, MsgKey, QmfData, QmfAddress, - AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION) + AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, + SchemaClass, SchemaClassId, SchemaEventClass, + SchemaObjectClass, WorkItem, SchemaMethod) # global flag that indicates which thread (if any) is -# running the console callback +# running the console notifier callback _callback_thread=None @@ -243,13 +245,86 @@ class QmfConsoleData(QmfData): logging.error(" TBD!!!") return None - def invoke_method(self, name, _in_args=None, _reply_handle=None, + def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ invoke the named method. """ - logging.error(" TBD!!!") - return None + assert self._agent + assert self._agent._console + + oid = self.get_object_id() + if oid is None: + raise ValueError("Cannot invoke methods on unmanaged objects.") + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + if _in_args: + _in_args = _in_args.copy() + + if self._schema: + # validate + ms = self._schema.get_method(name) + if ms is None: + raise ValueError("Method '%s' is undefined." % ms) + + for aname,prop in ms.get_arguments().iteritems(): + if aname not in _in_args: + if prop.get_default(): + _in_args[aname] = prop.get_default() + elif not prop.is_optional(): + raise ValueError("Method '%s' requires argument '%s'" + % (name, aname)) + for aname in _in_args.iterkeys(): + prop = ms.get_argument(aname) + if prop is None: + raise ValueError("Method '%s' does not define argument" + " '%s'" % (name, aname)) + if "I" not in prop.get_direction(): + raise ValueError("Method '%s' argument '%s' is not an" + " input." % (name, aname)) + + # @todo check if value is correct (type, range, etc) + + handle = self._agent._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + + _map = {self.KEY_OBJECT_ID:str(oid), + SchemaMethod.KEY_NAME:name} + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + + logging.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._agent._sendMethodReq(_map, handle) + except SendError, e: + logging.error(str(e)) + self._agent._console._req_correlation.release(handle) + return None + + # @todo async method calls!!! + if _reply_handle is not None: + print("ASYNC TBD") + + logging.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout) + self._agent._console._req_correlation.release(handle) + if not replyMsg: + logging.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + return None + + error=_map.get(SchemaMethod.KEY_ERROR) + if error: + return MethodResult(_error=QmfData.from_map(error)) + else: + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) @@ -364,6 +439,54 @@ class Agent(object): """ pass + + def invoke_method(self, name, _in_args={}, _reply_handle=None, + _timeout=None): + """ + """ + assert self._console + + if _timeout is None: + _timeout = self._console._reply_timeout + + if _in_args: + _in_args = _in_args.copy() + + handle = self._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + + _map = {SchemaMethod.KEY_NAME:name} + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + + logging.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._sendMethodReq(_map, handle) + except SendError, e: + logging.error(str(e)) + self._console._req_correlation.release(handle) + return None + + # @todo async method calls!!! + if _reply_handle is not None: + print("ASYNC TBD") + + logging.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = self._console._req_correlation.get_data(handle, _timeout) + self._console._req_correlation.release(handle) + if not replyMsg: + logging.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + return None + + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), + _error=_map.get(SchemaMethod.KEY_ERROR)) + def __repr__(self): return str(self._address) @@ -379,45 +502,46 @@ class Agent(object): self._sendMsg( msg, correlation_id ) + def _sendMethodReq(self, mr_map, correlation_id=None): + """ + """ + msg = Message(subject=makeSubject(OpCode.method_req), + properties={"method":"request"}, + content=mr_map) + self._sendMsg( msg, correlation_id ) + + ##============================================================================== - ## CONSOLE + ## METHOD CALL ##============================================================================== +class MethodResult(object): + def __init__(self, _out_args=None, _error=None): + self._error = _error + self._out_args = _out_args + def succeeded(self): + return self._error is None -class WorkItem(object): - """ - Describes an event that has arrived at the Console for the - application to process. The Notifier is invoked when one or - more of these WorkItems become available for processing. - """ - # - # Enumeration of the types of WorkItems produced by the Console - # - AGENT_ADDED = 1 - AGENT_DELETED = 2 - NEW_PACKAGE = 3 - NEW_CLASS = 4 - OBJECT_UPDATE = 5 - EVENT_RECEIVED = 7 - AGENT_HEARTBEAT = 8 + def get_exception(self): + return self._error - def __init__(self, kind, kwargs={}): - """ - Used by the Console to create a work item. - - @type kind: int - @param kind: work item type - """ - self._kind = kind - self._param_map = kwargs + def get_arguments(self): + return self._out_args + + def get_argument(self, name): + arg = None + if self._out_args: + arg = self._out_args.get(name) + return arg + + + ##============================================================================== + ## CONSOLE + ##============================================================================== - def getType(self): - return self._kind - def getParams(self): - return self._param_map @@ -465,6 +589,7 @@ class Console(Thread): self._cv = Condition() # for passing WorkItems to the application self._work_q = Queue.Queue() + self._work_q_put = False ## Old stuff below??? #self._broker_list = [] #self.impl = qmfengine.Console() @@ -512,14 +637,15 @@ class Console(Thread): " x-properties:" " {type:direct}}}", capacity=1) + logging.error("local addr=%s" % self._address) ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - logging.debug("agent.ind addr=%s" % ind_addr) + logging.error("agent.ind addr=%s" % ind_addr) self._announce_recvr = self._session.receiver(str(ind_addr) + ";{create:always," " node-properties:{type:topic}}", capacity=1) locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) - logging.debug("agent.locate addr=%s" % locate_addr) + logging.error("agent.locate addr=%s" % locate_addr) self._locate_sender = self._session.sender(str(locate_addr) + ";{create:always," " node-properties:{type:topic}}") @@ -615,9 +741,7 @@ class Console(Thread): " x-properties:" " {type:direct}}}") - query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, - QmfQuery._PREDICATE: - {QmfQuery._CMP_EQ: ["_name", name]}}) + query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject=makeSubject(OpCode.agent_locate), properties={"method":"request"}, content={MsgKey.query: query.map_encode()}) @@ -630,7 +754,7 @@ class Console(Thread): self._req_correlation.release(handle) return None - if not timeout: + if timeout is None: timeout = self._reply_timeout new_agent = None @@ -650,6 +774,7 @@ class Console(Thread): """ """ + target = query.get_target() handle = self._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") @@ -667,15 +792,63 @@ class Console(Thread): logging.debug("Waiting for response to Query (%s)" % timeout) reply = self._req_correlation.get_data(handle, timeout) self._req_correlation.release(handle) - logging.debug("Agent Query wait ended (%s)" % time.time()) - if reply: - print("Agent Query Reply='%s'" % reply) - return reply.content - else: - print("Agent Query FAILED!!!") + if not reply: + logging.debug("Agent Query wait timed-out.") return None - + if target == QmfQuery.TARGET_PACKAGES: + # simply pass back the list of package names + logging.debug("Response to Packet Query received") + return reply.content.get(MsgKey.package_info) + elif target == QmfQuery.TARGET_OBJECT_ID: + # simply pass back the list of object_id's + logging.debug("Response to Object Id Query received") + return reply.content.get(MsgKey.object_id) + elif target == QmfQuery.TARGET_SCHEMA_ID: + logging.debug("Response to Schema Id Query received") + id_list = [] + for sid_map in reply.content.get(MsgKey.schema_id): + id_list.append(SchemaClassId.from_map(sid_map)) + return id_list + elif target == QmfQuery.TARGET_SCHEMA: + logging.debug("Response to Schema Query received") + schema_list = [] + for schema_map in reply.content.get(MsgKey.schema): + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + schema_list.append(schema) + self._add_schema(schema) + return schema_list + elif target == QmfQuery.TARGET_OBJECT: + logging.debug("Response to Object Query received") + obj_list = [] + for obj_map in reply.content.get(MsgKey.data_obj): + # if the object references a schema, fetch it + sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + schema = self._fetch_schema(sid, _agent=agent, + _timeout=timeout) + if not schema: + logging.warning("Unknown schema, id=%s" % sid) + continue + obj = QmfConsoleData(map_=obj_map, agent=agent, + _schema=schema) + else: + # no schema needed + obj = QmfConsoleData(map_=obj_map, agent=agent) + obj_list.append(obj) + return obj_list + else: + logging.warning("Unexpected Target for a Query: '%s'" % target) + return None def run(self): global _callback_thread @@ -684,30 +857,30 @@ class Console(Thread): # while self._operational: - qLen = self._work_q.qsize() + # qLen = self._work_q.qsize() - try: - msg = self._announce_recvr.fetch(timeout = 0) - if msg: - self._dispatch(msg, _direct=False) - except Empty: - pass + while True: + try: + msg = self._announce_recvr.fetch(timeout=0) + except Empty: + break + self._dispatch(msg, _direct=False) - try: - msg = self._direct_recvr.fetch(timeout = 0) - if msg: - self._dispatch(msg, _direct=True) - except Empty: - pass + while True: + try: + msg = self._direct_recvr.fetch(timeout = 0) + except Empty: + break + self._dispatch(msg, _direct=True) self._expireAgents() # check for expired agents - if qLen == 0 and self._work_q.qsize() and self._notifier: - # work queue went non-empty, kick - # the application... - + #if qLen == 0 and self._work_q.qsize() and self._notifier: + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False _callback_thread = currentThread() - logging.info("Calling console indication") + logging.info("Calling console notifier.indication") self._notifier.indication() _callback_thread = None @@ -761,7 +934,7 @@ class Console(Thread): elif opcode == OpCode.object_ind: logging.warning("!!! object_ind TBD !!!") elif opcode == OpCode.response: - logging.warning("!!! response TBD !!!") + self._handleResponseMsg(msg, cmap, version, _direct) elif opcode == OpCode.schema_ind: logging.warning("!!! schema_ind TBD !!!") elif opcode == OpCode.noop: @@ -777,26 +950,29 @@ class Console(Thread): """ logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time())) - if MsgKey.agent_info in cmap: - try: - # TODO: fix - name = cmap[MsgKey.agent_info]["_name"] - except: - logging.warning("Bad agent-ind message received: '%s'" % msg) - return + ai_map = cmap.get(MsgKey.agent_info) + if not ai_map or not isinstance(ai_map, type({})): + logging.warning("Bad agent-ind message received: '%s'" % msg) + return + name = ai_map.get("_name") + if not name: + logging.warning("Bad agent-ind message received: agent name missing" + " '%s'" % msg) + return ignore = True matched = False correlated = False + agent_query = self._agent_discovery_filter + if msg.correlation_id: correlated = self._req_correlation.isValid(msg.correlation_id) + if direct and correlated: ignore = False - elif self._agent_discovery_filter: - logging.error("FIXME: agent discovery filter - new agent name style") - # matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode())) - # ignore = not matched - matched = True; ignore = False # for now + elif agent_query: + matched = agent_query.evaluate(QmfData.create(values=ai_map)) + ignore = not matched if not ignore: agent = None @@ -820,9 +996,9 @@ class Console(Thread): if old_timestamp == None and matched: logging.error("AGENT_ADDED for %s (%s)" % (agent, time.time())) - wi = WorkItem(WorkItem.AGENT_ADDED, - {"agent": agent}) + wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) self._work_q.put(wi) + self._work_q_put = True if correlated: # wake up all waiters @@ -847,6 +1023,22 @@ class Console(Thread): self._req_correlation.put_data(msg.correlation_id, msg) + def _handleResponseMsg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + # @todo code replication - clean me. + logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time())) + + if not self._req_correlation.isValid(msg.correlation_id): + logging.error("FIXME: uncorrelated response??? msg='%s'" % str(msg)) + return + + # wake up all waiters + logging.error("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + def _expireAgents(self): """ Check for expired agents and issue notifications when they expire. @@ -865,8 +1057,10 @@ class Console(Thread): if agent_deathtime <= now: logging.debug("AGENT_DELETED for %s" % agent) agent._announce_timestamp = None - wi = WorkItem(WorkItem.AGENT_DELETED, {"agent":agent}) + wi = WorkItem(WorkItem.AGENT_DELETED, None, + {"agent":agent}) self._work_q.put(wi) + self._work_q_put = True else: if (agent_deathtime - now) < next_expire_delta: next_expire_delta = agent_deathtime - now @@ -903,28 +1097,29 @@ class Console(Thread): # new agent - query for its schema database for # seeding the schema cache (@todo) - # query = QmfQuery({QmfQuery._TARGET_SCHEMA_ID:None}) + # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None}) # agent._sendQuery( query ) return agent - def enableAgentDiscovery(self, query=None): + def enable_agent_discovery(self, _query=None): """ Called to enable the asynchronous Agent Discovery process. Once enabled, AGENT_ADD work items can arrive on the WorkQueue. """ - if query: - if not isinstance(query, QmfQuery): - raise TypeError("Type QmfQuery expected") - self._agent_discovery_filter = query + # @todo: fix - take predicate only, not entire query! + if _query is not None: + if (not isinstance(_query, QmfQuery) or + _query.get_target() != QmfQuery.TARGET_AGENT): + raise TypeError("Type QmfQuery with target == TARGET_AGENT expected") + self._agent_discovery_filter = _query else: # create a match-all agent query (no predicate) - self._agent_discovery_filter = QmfQuery({QmfQuery._TARGET: - {QmfQuery._TARGET_AGENT:None}}) + self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) - def disableAgentDiscovery(self): + def disable_agent_discovery(self): """ Called to disable the async Agent Discovery process enabled by calling enableAgentDiscovery() @@ -933,7 +1128,7 @@ class Console(Thread): - def getWorkItemCount(self): + def get_workitem_count(self): """ Returns the count of pending WorkItems that can be retrieved. """ @@ -941,19 +1136,19 @@ class Console(Thread): - def getNextWorkItem(self, timeout=None): + def get_next_workitem(self, timeout=None): """ Returns the next pending work item, or None if none available. @todo: subclass and return an Empty event instead. """ try: wi = self._work_q.get(True, timeout) - except: + except Queue.Empty: return None return wi - def releaseWorkItem(self, wi): + def release_workitem(self, wi): """ Return a WorkItem to the Console when it is no longer needed. @todo: call Queue.task_done() - only 2.5+ @@ -963,6 +1158,50 @@ class Console(Thread): """ pass + def _add_schema(self, schema): + """ + @todo + """ + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass type expected") + + self._lock.acquire() + try: + sid = schema.get_class_id() + if not self._schema_cache.has_key(sid): + self._schema_cache[sid] = schema + finally: + self._lock.release() + + def _fetch_schema(self, schema_id, _agent=None, _timeout=None): + """ + Find the schema identified by schema_id. If not in the cache, ask the + agent for it. + """ + if not isinstance(schema_id, SchemaClassId): + raise TypeError("SchemaClassId type expected") + + self._lock.acquire() + try: + schema = self._schema_cache.get(schema_id) + if schema: + return schema + finally: + self._lock.release() + + if _agent is None: + return None + + # note: doQuery will add the new schema to the cache automatically. + slist = self.doQuery(_agent, + QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), + _timeout) + if slist: + return slist[0] + else: + return None + + # def get_packages(self): # plist = [] @@ -1341,9 +1580,7 @@ class Console(Thread): if __name__ == '__main__': # temp test code - from qmfCommon import (qmfTypes, QmfData, - QmfEvent, SchemaClassId, SchemaEventClass, - SchemaProperty, SchemaObjectClass) + from qmfCommon import (qmfTypes, QmfEvent, SchemaProperty) logging.getLogger().setLevel(logging.INFO) @@ -1362,7 +1599,7 @@ if __name__ == '__main__': _myConsole = Console(notifier=_noteMe) - _myConsole.enableAgentDiscovery() + _myConsole.enable_agent_discovery() logging.info("Waiting...") @@ -1430,16 +1667,16 @@ if __name__ == '__main__': "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, "unit": "seconds", "desc": "time until I retire"}, - "meth1": {"desc": "A test method", - "arguments": + "meth1": {"_desc": "A test method", + "_arguments": {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, "desc": "an argument 1", "dir": "I"}, "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, "dir": "IO", "desc": "some weird boolean"}}}, - "meth2": {"desc": "A test method", - "arguments": + "meth2": {"_desc": "A test method", + "_arguments": {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, "desc": "an 'nuther argument", "dir": @@ -1538,15 +1775,14 @@ if __name__ == '__main__': logging.info( "******** Messing around with Queries ********" ) - _q1 = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, - QmfQuery._PREDICATE: - {QmfQuery._LOGIC_AND: - [{QmfQuery._CMP_EQ: ["vendor", "AVendor"]}, - {QmfQuery._CMP_EQ: ["product", "SomeProduct"]}, - {QmfQuery._CMP_EQ: ["name", "Thingy"]}, - {QmfQuery._LOGIC_OR: - [{QmfQuery._CMP_LE: ["temperature", -10]}, - {QmfQuery._CMP_FALSE: None}, - {QmfQuery._CMP_EXISTS: ["namey"]}]}]}}) - - print("_q1.mapEncode() = [%s]" % _q1) + _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, + QmfQueryPredicate({QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EQ: ["vendor", "AVendor"]}, + {QmfQuery.CMP_EQ: ["product", "SomeProduct"]}, + {QmfQuery.CMP_EQ: ["name", "Thingy"]}, + {QmfQuery.LOGIC_OR: + [{QmfQuery.CMP_LE: ["temperature", -10]}, + {QmfQuery.CMP_FALSE: None}, + {QmfQuery.CMP_EXISTS: ["namey"]}]}]})) + + print("_q1.mapEncode() = [%s]" % _q1.map_encode()) diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py index 9683127a6e..dd55b49316 100644 --- a/qpid/python/qmf/test/agent_test.py +++ b/qpid/python/qmf/test/agent_test.py @@ -5,7 +5,8 @@ from threading import Semaphore from qpid.messaging import * from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, - QmfEvent, SchemaMethod, Notifier, SchemaClassId) + QmfEvent, SchemaMethod, Notifier, SchemaClassId, + WorkItem) from qmfAgent import (Agent, QmfAgentData) @@ -61,14 +62,14 @@ _agent.register_object_class(_schema) # instantiate managed data objects matching the schema -_obj = QmfAgentData( _agent, _schema=_schema ) -_obj.set_value("index1", 100) -_obj.set_value("index2", "a name" ) -_obj.set_value("set_string", "UNSET") -_obj.set_value("set_int", 0) -_obj.set_value("query_count", 0) -_obj.set_value("method_call_count", 0) -_agent.add_object( _obj ) +_obj1 = QmfAgentData( _agent, _schema=_schema ) +_obj1.set_value("index1", 100) +_obj1.set_value("index2", "a name" ) +_obj1.set_value("set_string", "UNSET") +_obj1.set_value("set_int", 0) +_obj1.set_value("query_count", 0) +_obj1.set_value("method_call_count", 0) +_agent.add_object( _obj1 ) _agent.add_object( QmfAgentData( _agent, _schema=_schema, _values={"index1":99, @@ -78,26 +79,52 @@ _agent.add_object( QmfAgentData( _agent, _schema=_schema, "query_count": 0, "method_call_count": 0} )) +# add an "unstructured" object to the Agent +_obj2 = QmfAgentData(_agent, _object_id="01545") +_obj2.set_value("field1", "a value") +_obj2.set_value("field2", 2) +_obj2.set_value("field3", {"a":1, "map":2, "value":3}) +_obj2.set_value("field4", ["a", "list", "value"]) +_agent.add_object(_obj2) + + ## Now connect to the broker _c = Connection("localhost") _c.connect() _agent.setConnection(_c) +_error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."}) _done = False while not _done: - try: - _notifier.waitForWork() - - _wi = _agent.getNextWorkItem(timeout=0) - while _wi: - print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) - _agent.releaseWorkItem(_wi) - _wi = _agent.getNextWorkItem(timeout=0) - except: - print( "shutting down..." ) - _done = True + # try: + _notifier.waitForWork() + + _wi = _agent.get_next_workitem(timeout=0) + while _wi: + + if _wi.get_type() == WorkItem.METHOD_CALL: + mc = _wi.get_params() + + if mc.get_name() == "set_meth": + print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id()) + print("!!! args='%s'" % str(mc.get_args())) + print("!!! userid=%s" % str(mc.get_user_id())) + print("!!! handle=%s" % _wi.get_handle()) + _agent.method_response(_wi.get_handle(), + {"rc1": 100, "rc2": "Success"}) + else: + print("!!! Unknown Method name = %s" % mc.get_name()) + _agent.method_response(_wi.get_handle(), _error=_error_data) + else: + print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params()))) + + _agent.release_workitem(_wi) + _wi = _agent.get_next_workitem(timeout=0) + # except: + # print( "shutting down...") + # _done = True print( "Removing connection... TBD!!!" ) #_myConsole.remove_connection( _c, 10 ) diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py index 6db515dc99..a7703e24a6 100644 --- a/qpid/python/qmf/test/console_test.py +++ b/qpid/python/qmf/test/console_test.py @@ -4,7 +4,8 @@ from threading import Semaphore from qpid.messaging import * -from qmfCommon import (Notifier, QmfQuery, MsgKey, SchemaClassId, SchemaClass) +from qmfCommon import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey, + SchemaClassId, SchemaClass, QmfData) from qmfConsole import Console @@ -33,64 +34,117 @@ _notifier = ExampleNotifier() _myConsole = Console(notifier=_notifier) _myConsole.addConnection( _c ) -# Discover only agents from vendor "redhat.com" that -# are a "qmf" product.... +# Allow discovery only for the agent named "qmf.testAgent" # @todo: replace "manual" query construction with # a formal class-based Query API -_query = {QmfQuery._TARGET: - {QmfQuery._TARGET_AGENT:None}, - QmfQuery._PREDICATE: - {QmfQuery._CMP_EQ: ["_name", "qmf.testAgent"]}} -_query = QmfQuery(_query) - -_myConsole.enableAgentDiscovery(_query) +_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, + QmfQueryPredicate({QmfQuery.CMP_EQ: + [QmfQuery.KEY_AGENT_NAME, + "qmf.testAgent"]})) +_myConsole.enable_agent_discovery(_query) _done = False while not _done: # try: _notifier.waitForWork() - _wi = _myConsole.getNextWorkItem(timeout=0) + _wi = _myConsole.get_next_workitem(timeout=0) while _wi: - print("!!! work item received %d:%s" % (_wi.getType(), - str(_wi.getParams()))) + print("!!! work item received %d:%s" % (_wi.get_type(), + str(_wi.get_params()))) - if _wi.getType() == _wi.AGENT_ADDED: - _agent = _wi.getParams().get("agent") + if _wi.get_type() == _wi.AGENT_ADDED: + _agent = _wi.get_params().get("agent") if not _agent: print("!!!! AGENT IN REPLY IS NULL !!! ") - _query = QmfQuery( {QmfQuery._TARGET: - {QmfQuery._TARGET_PACKAGES:None}} ) - - _reply = _myConsole.doQuery(_agent, _query) - - package_list = _reply.get(MsgKey.package_info) - for pname in package_list: - print("!!! Querying for schema from package: %s" % pname) - _query = QmfQuery({QmfQuery._TARGET: - {QmfQuery._TARGET_SCHEMA_ID:None}, - QmfQuery._PREDICATE: - {QmfQuery._CMP_EQ: - [SchemaClassId.KEY_PACKAGE, pname]}}) - - _reply = _myConsole.doQuery(_agent, _query) - - schema_id_list = _reply.get(MsgKey.schema_id) - for sid_map in schema_id_list: - _query = QmfQuery({QmfQuery._TARGET: - {QmfQuery._TARGET_SCHEMA:None}, - QmfQuery._PREDICATE: - {QmfQuery._CMP_EQ: - [SchemaClass.KEY_SCHEMA_ID, sid_map]}}) - - _reply = _myConsole.doQuery(_agent, _query) - - - - _myConsole.releaseWorkItem(_wi) - _wi = _myConsole.getNextWorkItem(timeout=0) + _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) + oid_list = _myConsole.doQuery(_agent, _query) + + print("!!!************************** REPLY=%s" % oid_list) + + for oid in oid_list: + _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, + oid) + obj_list = _myConsole.doQuery(_agent, _query) + + print("!!!************************** REPLY=%s" % obj_list) + + if obj_list is None: + obj_list={} + + for obj in obj_list: + resp = obj.invoke_method( "set_meth", + {"arg_int": -11, + "arg_str": "are we not goons?"}, + None, + 3) + if resp is None: + print("!!!*** NO RESPONSE FROM METHOD????") + else: + print("!!! method succeeded()=%s" % resp.succeeded()) + print("!!! method exception()=%s" % resp.get_exception()) + print("!!! method get args() = %s" % resp.get_arguments()) + + if not obj.is_described(): + resp = obj.invoke_method( "bad method", + {"arg_int": -11, + "arg_str": "are we not goons?"}, + None, + 3) + if resp is None: + print("!!!*** NO RESPONSE FROM METHOD????") + else: + print("!!! method succeeded()=%s" % resp.succeeded()) + print("!!! method exception()=%s" % resp.get_exception()) + print("!!! method get args() = %s" % resp.get_arguments()) + + + #--------------------------------- + #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name") + + #obj_list = _myConsole.doQuery(_agent, _query) + + #--------------------------------- + + # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) + + # package_list = _myConsole.doQuery(_agent, _query) + + # for pname in package_list: + # print("!!! Querying for schema from package: %s" % pname) + # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, + # QmfQueryPredicate( + # {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]})) + + # schema_id_list = _myConsole.doQuery(_agent, _query) + # for sid in schema_id_list: + # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, + # QmfQueryPredicate( + # {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID, + # sid.map_encode()]})) + + # schema_list = _myConsole.doQuery(_agent, _query) + # for schema in schema_list: + # sid = schema.get_class_id() + # _query = QmfQuery.create_predicate( + # QmfQuery.TARGET_OBJECT_ID, + # QmfQueryPredicate({QmfQuery.CMP_EQ: + # [QmfData.KEY_SCHEMA_ID, + # sid.map_encode()]})) + + # oid_list = _myConsole.doQuery(_agent, _query) + # for oid in oid_list: + # _query = QmfQuery.create_id( + # QmfQuery.TARGET_OBJECT, oid) + # _reply = _myConsole.doQuery(_agent, _query) + + # print("!!!************************** REPLY=%s" % _reply) + + + _myConsole.release_workitem(_wi) + _wi = _myConsole.get_next_workitem(timeout=0) # except: # logging.info( "shutting down..." ) # _done = True |