summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-01-15 14:29:41 +0000
committerTed Ross <tross@apache.org>2010-01-15 14:29:41 +0000
commit9ebba0e9a3c32d9263365140ae7066ff6f3ef38c (patch)
treed6bba5294eb300a3681b31e5f5eda4b3445e57ed
parentc7e1aca1e90bacbf7ef579ffce686f187243b3c7 (diff)
downloadqpid-python-9ebba0e9a3c32d9263365140ae7066ff6f3ef38c.tar.gz
QPID-2261 - Branch patch from Ken Giusti
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@899644 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qmf/qmfAgent.py303
-rw-r--r--qpid/python/qmf/qmfCommon.py415
-rw-r--r--qpid/python/qmf/qmfConsole.py468
-rw-r--r--qpid/python/qmf/test/agent_test.py67
-rw-r--r--qpid/python/qmf/test/console_test.py144
5 files changed, 995 insertions, 402 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py
index 278eafedbc..89be722833 100644
--- a/qpid/python/qmf/qmfAgent.py
+++ b/qpid/python/qmf/qmfAgent.py
@@ -21,13 +21,56 @@ import sys
import logging
import datetime
import time
-from threading import Thread, Lock
+import Queue
+from threading import Thread, Lock, currentThread
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
makeSubject, parseSubject, OpCode, QmfQuery,
SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass)
+ SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
+
+# global flag that indicates which thread (if any) is
+# running the agent notifier callback
+_callback_thread=None
+
+ ##==============================================================================
+ ## METHOD CALL
+ ##==============================================================================
+
+class _MethodCallHandle(object):
+ """
+ Private class used to hold context when handing off a method call to the
+ application. Given to the app in a WorkItem, provided to the agent when
+ method_response() is invoked.
+ """
+ def __init__(self, correlation_id, reply_to, meth_name, _oid=None):
+ self.correlation_id = correlation_id
+ self.reply_to = reply_to
+ self.meth_name = meth_name
+ self.oid = _oid
+
+class MethodCallParams(object):
+ """
+ """
+ def __init__(self, name, _oid=None, _in_args=None, _user_id=None):
+ self._meth_name = name
+ self._oid = _oid
+ self._in_args = _in_args
+ self._user_id = _user_id
+
+ def get_name(self):
+ return self._meth_name
+
+ def get_object_id(self):
+ return self._oid
+
+ def get_args(self):
+ return self._in_args
+
+ def get_user_id(self):
+ return self._user_id
+
##==============================================================================
@@ -54,6 +97,8 @@ class Agent(Thread):
self._schema_timestamp = long(0)
self._schema = {}
self._agent_data = {}
+ self._work_q = Queue.Queue()
+ self._work_q_put = False
def get_name(self):
return self.name
@@ -63,9 +108,9 @@ class Agent(Thread):
locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- logging.debug("my direct addr=%s" % my_addr)
- logging.debug("agent.locate addr=%s" % locate_addr)
- logging.debug("agent.ind addr=%s" % ind_addr)
+ logging.error("my direct addr=%s" % my_addr)
+ logging.error("agent.locate addr=%s" % locate_addr)
+ logging.error("agent.ind addr=%s" % ind_addr)
self._conn = conn
self._session = self._conn.session()
@@ -133,30 +178,60 @@ class Agent(Thread):
self._lock.release()
- def methodResponse(self, context, status, text, arguments):
- logging.error("!!!Agent.methodResponse() TBD!!!")
+ def method_response(self, handle, _out_args=None, _error=None):
+ """
+ """
+ if not isinstance(handle, _MethodCallHandle):
+ raise TypeError("Invalid handle passed to method_response!")
+
+ _map = {SchemaMethod.KEY_NAME:handle.meth_name}
+ if handle.oid is not None:
+ _map[QmfData.KEY_OBJECT_ID] = handle.oid
+ if _out_args is not None:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
+ if _error is not None:
+ if not isinstance(_error, QmfData):
+ raise TypeError("Invalid type for error - must be QmfData")
+ _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
+
+ msg = Message(subject=makeSubject(OpCode.response),
+ properties={"method":"response"},
+ content={MsgKey.method:_map})
+ msg.correlation_id = handle.correlation_id
+
+ try:
+ tmp_snd = self._session.sender( handle.reply_to )
+ tmp_snd.send(msg)
+ logging.debug("method-response sent to [%s]" % handle.reply_to)
+ except SendError, e:
+ logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e)))
- def getWorkItemCount(self):
+ def get_workitem_count(self):
"""
Returns the count of pending WorkItems that can be retrieved.
"""
- logging.error("!!!Agent.getWorkItemCount() TBD!!!")
+ return self._work_q.qsize()
- def getNextWorkItem(self, timeout=None):
+ def get_next_workitem(self, timeout=None):
"""
Obtains the next pending work item, or None if none available.
"""
- logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+ try:
+ wi = self._work_q.get(True, timeout)
+ except Queue.Empty:
+ return None
+ return wi
- def releaseWorkItem(self, wi):
+ def release_workitem(self, wi):
"""
Releases a WorkItem instance obtained by getNextWorkItem(). Called when
the application has finished processing the WorkItem.
"""
- logging.error("!!!Agent.releaseWorkItem() TBD!!!")
+ pass
def run(self):
+ global _callback_thread
next_heartbeat = datetime.datetime.utcnow()
while self._running:
@@ -174,19 +249,29 @@ class Agent(Thread):
except Empty:
continue
- try:
- msg = self._locate_receiver.fetch(timeout = 0)
- except Empty:
- msg = None
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=False)
-
- try:
- msg = self._direct_receiver.fetch(timeout = 0)
- except Empty:
- msg = None
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=True)
+ while True:
+ try:
+ msg = self._locate_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=False)
+
+ while True:
+ try:
+ msg = self._direct_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=True)
+
+ if self._work_q_put and self._notifier:
+ # new stuff on work queue, kick the the application...
+ self._work_q_put = False
+ _callback_thread = currentThread()
+ logging.info("Calling agent notifier.indication")
+ self._notifier.indication()
+ _callback_thread = None
#
# Private:
@@ -227,7 +312,7 @@ class Agent(Thread):
elif opcode == OpCode.get_query:
self._handleQueryMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.method_req:
- logging.warning("!!! METHOD_REQ TBD !!!")
+ self._handleMethodReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.cancel_subscription:
logging.warning("!!! CANCEL_SUB TBD !!!")
elif opcode == OpCode.create_subscription:
@@ -253,7 +338,7 @@ class Agent(Thread):
query = cmap.get(MsgKey.query)
if query is not None:
# fake a QmfData containing my identifier for the query compare
- tmpData = QmfData(_values={"_name": self.get_name()})
+ tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
reply = QmfQuery(query).evaluate(tmpData)
if reply:
@@ -278,24 +363,53 @@ class Agent(Thread):
if "method" in props and props["method"] == "request":
qmap = cmap.get(MsgKey.query)
if qmap:
- query = QmfQuery(qmap)
- target = query.getTarget()
- if target == QmfQuery._TARGET_PACKAGES:
+ query = QmfQuery.from_map(qmap)
+ target = query.get_target()
+ if target == QmfQuery.TARGET_PACKAGES:
self._queryPackages( msg, query )
- elif target == QmfQuery._TARGET_SCHEMA_ID:
+ elif target == QmfQuery.TARGET_SCHEMA_ID:
self._querySchema( msg, query, _idOnly=True )
- elif target == QmfQuery._TARGET_SCHEMA:
+ elif target == QmfQuery.TARGET_SCHEMA:
self._querySchema( msg, query)
- elif target == QmfQuery._TARGET_AGENT:
- logging.warning("!!! Query TARGET=AGENT TBD !!!")
- elif target == QmfQuery._TARGET_OBJECT_ID:
- logging.warning("!!! Query TARGET=OBJECT_ID TBD !!!")
- elif target == QmfQuery._TARGET_OBJECT:
- logging.warning("!!! Query TARGET=OBJECT TBD !!!")
+ elif target == QmfQuery.TARGET_AGENT:
+ logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
+ elif target == QmfQuery.TARGET_OBJECT_ID:
+ self._queryData(msg, query, _idOnly=True)
+ elif target == QmfQuery.TARGET_OBJECT:
+ self._queryData(msg, query)
else:
logging.warning("Unrecognized query target: '%s'" % str(target))
+
+ def _handleMethodReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Method Request
+ """
+ if "method" in props and props["method"] == "request":
+ mname = cmap.get(SchemaMethod.KEY_NAME)
+ if not mname:
+ logging.warning("Invalid method call from '%s': no name"
+ % msg.reply_to)
+ return
+
+ in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
+ oid = cmap.get(QmfData.KEY_OBJECT_ID)
+
+ print("!!! ci=%s rt=%s mn=%s oid=%s" %
+ (msg.correlation_id,
+ msg.reply_to,
+ mname,
+ oid))
+
+ handle = _MethodCallHandle(msg.correlation_id,
+ msg.reply_to,
+ mname,
+ oid)
+ param = MethodCallParams( mname, oid, in_args, msg.user_id)
+ self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
+ self._work_q_put = True
+
def _queryPackages(self, msg, query):
"""
Run a query against the list of known packages
@@ -304,7 +418,7 @@ class Agent(Thread):
self._lock.acquire()
try:
for name in self._packages.iterkeys():
- if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})):
+ if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})):
pnames.append(name)
finally:
self._lock.release()
@@ -326,36 +440,97 @@ class Agent(Thread):
"""
"""
schemas = []
- self._lock.acquire()
- try:
- for sid,val in self._schema.iteritems():
- if query.evaluate(val):
- if _idOnly:
- schemas.append(sid.map_encode())
- else:
- schemas.append(val.map_encode())
- finally:
- self._lock.release()
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ found = None
+ self._lock.acquire()
+ try:
+ found = self._schema.get(query.get_id())
+ finally:
+ self._lock.release()
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
+ self._lock.acquire()
+ try:
+ for sid,val in self._schema.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ schemas.append(sid.map_encode())
+ else:
+ schemas.append(val.map_encode())
+ finally:
+ self._lock.release()
- try:
- tmp_snd = self._session.sender( msg.reply_to )
- if _idOnly:
- content = {MsgKey.schema_id: schemas}
- else:
- content = {MsgKey.schema:schemas}
+ tmp_snd = self._session.sender( msg.reply_to )
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content=content )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
+ if _idOnly:
+ content = {MsgKey.schema_id: schemas}
+ else:
+ content = {MsgKey.schema:schemas}
+
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content=content )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ try:
tmp_snd.send(m)
logging.debug("schema_id sent to [%s]" % msg.reply_to)
except SendError, e:
logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+ def _queryData( self, msg, query, _idOnly=False ):
+ """
+ """
+ data_objs = []
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ found = None
+ self._lock.acquire()
+ try:
+ found = self._agent_data.get(query.get_id())
+ finally:
+ self._lock.release()
+ if found:
+ if _idOnly:
+ data_objs.append(query.get_id())
+ else:
+ data_objs.append(found.map_encode())
+ else: # otherwise, evaluate all data
+ self._lock.acquire()
+ try:
+ for oid,val in self._agent_data.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(val.map_encode())
+ finally:
+ self._lock.release()
+
+ tmp_snd = self._session.sender( msg.reply_to )
+
+ if _idOnly:
+ content = {MsgKey.object_id:data_objs}
+ else:
+ content = {MsgKey.data_obj:data_objs}
+
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content=content )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ try:
+ tmp_snd.send(m)
+ logging.debug("data reply sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
##==============================================================================
@@ -407,7 +582,7 @@ class QmfAgentData(QmfData):
if __name__ == '__main__':
# static test cases - no message passing, just exercise API
- from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes,
+ from qmfCommon import (AgentName, SchemaProperty, qmfTypes,
SchemaMethod, SchemaEventClass)
logging.getLogger().setLevel(logging.INFO)
@@ -436,8 +611,8 @@ if __name__ == '__main__':
# add method
_meth = SchemaMethod(_desc="Method to set string and int in object." )
- _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
_schema.add_method( "set_meth", _meth )
# Add schema to Agent
diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py
index 9bcb166c6b..580d86b7a7 100644
--- a/qpid/python/qmf/qmfCommon.py
+++ b/qpid/python/qmf/qmfCommon.py
@@ -49,6 +49,9 @@ class MsgKey(object):
package_info = "package_info"
schema_id = "schema_id"
schema = "schema"
+ object_id="object_id"
+ data_obj="object"
+ method="method"
class OpCode(object):
@@ -92,6 +95,11 @@ def parseSubject(_sub):
return _sub[3:].split('.', 1)
+##==============================================================================
+## Async Event Model
+##==============================================================================
+
+
class Notifier(object):
"""
Virtual base class that defines a call back which alerts the application that
@@ -107,6 +115,48 @@ class Notifier(object):
+class WorkItem(object):
+ """
+ Describes an event that has arrived for the application to process. The
+ Notifier is invoked when one or more of these WorkItems become available
+ for processing.
+ """
+ # Enumeration of the types of WorkItems produced on the Console
+ AGENT_ADDED=1
+ AGENT_DELETED=2
+ NEW_PACKAGE=3
+ NEW_CLASS=4
+ OBJECT_UPDATE=5
+ EVENT_RECEIVED=7
+ AGENT_HEARTBEAT=8
+ # Enumeration of the types of WorkItems produced on the Agent
+ METHOD_CALL=1000
+ QUERY=1001
+ SUBSCRIBE=1002
+ UNSUBSCRIBE=1003
+
+ def __init__(self, kind, handle, _params=None):
+ """
+ Used by the Console to create a work item.
+
+ @type kind: int
+ @param kind: work item type
+ """
+ self._kind = kind
+ self._handle = handle
+ self._params = _params
+
+ def get_type(self):
+ return self._kind
+
+ def get_handle(self):
+ return self._handle
+
+ def get_params(self):
+ return self._params
+
+
+
##==============================================================================
## Addressing
##==============================================================================
@@ -155,15 +205,15 @@ class AgentName(object):
"""
_separator = ":"
- def __init__(self, vendor, product, name, str_=None):
+ def __init__(self, vendor, product, name, _str=None):
"""
Note: this object must be immutable, as it is used to index into a dictionary
"""
- if str_:
+ if _str is not None:
# construct from string representation
- if _str.count(AgentId._separator) < 2:
- raise TypeError("AgentId string format must be 'vendor.product.name'")
- self._vendor, self._product, self._name = param.split(AgentId._separator)
+ if _str.count(AgentName._separator) < 2:
+ raise TypeError("AgentName string format must be 'vendor.product.name'")
+ self._vendor, self._product, self._name = _str.split(AgentName._separator)
else:
self._vendor = vendor
self._product = product
@@ -299,9 +349,9 @@ class QmfData(_mapEncoder):
_object_id=_object_id, _schema=_schema, _const=_const)
create = classmethod(_create)
- def _from_map(cls, map_, _schema=None, _const=False):
+ def __from_map(cls, map_, _schema=None, _const=False):
return cls(_map=map_, _schema=_schema, _const=_const)
- from_map = classmethod(_from_map)
+ from_map = classmethod(__from_map)
def is_managed(self):
return self._object_id is not None
@@ -671,9 +721,9 @@ class Arguments(object):
-class MethodResponse(object):
- def __init__(self, impl):
- pass
+#class MethodResponse(object):
+# def __init__(self, impl):
+# pass
# self.impl = qmfengine.MethodResponse(impl)
@@ -756,140 +806,173 @@ class MethodResponse(object):
class QmfQuery(_mapEncoder):
- _TARGET="what"
- _PREDICATE="where"
+ KEY_TARGET="what"
+ KEY_PREDICATE="where"
+ KEY_ID="id"
+
+ ### Query Types
+ ID=1
+ PREDICATE=2
#### Query Targets ####
- _TARGET_PACKAGES="schema_package"
+ TARGET_PACKAGES="schema_package"
# (returns just package names)
- # predicate key(s):
+ # allowed predicate key(s):
#
- #_PRED_PACKAGE
+ # SchemaClassId.KEY_PACKAGE
-
- _TARGET_SCHEMA_ID="schema_id"
- _TARGET_SCHEMA="schema"
- # predicate key(s):
+ TARGET_SCHEMA_ID="schema_id"
+ TARGET_SCHEMA="schema"
+ # allowed predicate key(s):
#
- #_PRED_PACKAGE
- #_PRED_CLASS
- #_PRED_TYPE
- #_PRED_HASH
- #_PRED_SCHEMA_ID
+ # SchemaClassId.KEY_PACKAGE
+ # SchemaClassId.KEY_CLASS
+ # SchemaClassId.KEY_TYPE
+ # SchemaClassId.KEY_HASH
+ # SchemaClass.KEY_SCHEMA_ID
# name of property (exist test only)
# name of method (exist test only)
-
- _TARGET_AGENT="agent"
- # predicate keys(s):
+ TARGET_AGENT="agent"
+ # allowed predicate keys(s):
#
- #_PRED_VENDOR="_vendor"
- #_PRED_PRODUCT="_product"
- #_PRED_NAME="_name"
-
- _TARGET_OBJECT_ID="object_id"
- _TARGET_OBJECT="object"
- # package and class names must be suppled in the target value:
- # predicate on all values or meta-values[tbd]
+ KEY_AGENT_NAME="_name"
+
+ TARGET_OBJECT_ID="object_id"
+ TARGET_OBJECT="object"
+ # allowed predicate keys(s):
#
- #_PRED_PACKAGE
- #_PRED_CLASS
- #_PRED_TYPE
- #_PRED_HASH
- #_primary_key
- #_PRED_SCHEMA_ID
- #_PRED_OBJECT_ID
- #_PRED_UPDATE_TS
- #_PRED_CREATE_TS
- #_PRED_DELETE_TS
- #<name of property>
-
- _PRED_PACKAGE="_package_name"
- _PRED_CLASS="_class_name"
- _PRED_TYPE="_type"
- _PRED_HASH="_hash_str"
- _PRED_VENDOR="_vendor"
- _PRED_PRODUCT="_product"
- _PRED_NAME="_name"
- _PRED_PRIMARY_KEY="_primary_key"
- _PRED_SCHEMA_ID="_schema_id"
- _PRED_OBJECT_ID="_object_id"
- _PRED_UPDATE_TS="_update_ts"
- _PRED_CREATE_TS="_create_ts"
- _PRED_DELETE_TS="_delete_ts"
-
- _CMP_EQ="eq"
- _CMP_NE="ne"
- _CMP_LT="lt"
- _CMP_LE="le"
- _CMP_GT="gt"
- _CMP_GE="ge"
- _CMP_RE_MATCH="re_match"
- _CMP_EXISTS="exists"
- _CMP_TRUE="true"
- _CMP_FALSE="false"
-
- _LOGIC_AND="and"
- _LOGIC_OR="or"
- _LOGIC_NOT="not"
-
- _valid_targets = [_TARGET_PACKAGES, _TARGET_OBJECT_ID, _TARGET_SCHEMA, _TARGET_SCHEMA_ID,
- _TARGET_OBJECT, _TARGET_AGENT]
-
- def __init__(self, qmap):
+ # SchemaClassId.KEY_PACKAGE
+ # SchemaClassId.KEY_CLASS
+ # SchemaClassId.KEY_TYPE
+ # SchemaClassId.KEY_HASH
+ # QmfData.KEY_SCHEMA_ID
+ # QmfData.KEY_OBJECT_ID
+ # QmfData.KEY_UPDATE_TS
+ # QmfData.KEY_CREATE_TS
+ # QmfData.KEY_DELETE_TS
+ # <name of data value>
+
+ CMP_EQ="eq"
+ CMP_NE="ne"
+ CMP_LT="lt"
+ CMP_LE="le"
+ CMP_GT="gt"
+ CMP_GE="ge"
+ CMP_RE_MATCH="re_match"
+ CMP_EXISTS="exists"
+ CMP_TRUE="true"
+ CMP_FALSE="false"
+
+ LOGIC_AND="and"
+ LOGIC_OR="or"
+ LOGIC_NOT="not"
+
+ _valid_targets = [TARGET_PACKAGES, TARGET_OBJECT_ID, TARGET_SCHEMA, TARGET_SCHEMA_ID,
+ TARGET_OBJECT, TARGET_AGENT]
+
+ def __init__(self, _target=None, _target_params=None, _predicate=None,
+ _id=None, _map=None):
"""
"""
- self._target_map = None
- self._predicate = None
+ if _map is not None:
+ target_map = _map.get(self.KEY_TARGET)
+ if not target_map:
+ raise TypeError("QmfQuery requires a target map")
+
+ _target = None
+ for key in target_map.iterkeys():
+ if key in self._valid_targets:
+ _target = key
+ break
- if type(qmap) != dict:
- raise TypeError("constructor must be of type dict")
+ _target_params = target_map.get(_target)
+
+ _id = _map.get(self.KEY_ID)
+ if _id is not None:
+ # Convert identifier to native type if necessary
+ if _target == self.TARGET_SCHEMA:
+ _id = SchemaClassId.from_map(_id)
+ else:
+ pred = _map.get(self.KEY_PREDICATE)
+ if pred:
+ _predicate = QmfQueryPredicate(pred)
+
+ self._target = _target
+ if not self._target:
+ raise TypeError("QmfQuery requires a target value")
+ self._target_params = _target_params
+ self._predicate = _predicate
+ self._id = _id
+
+ # constructors
+ def _create_wildcard(cls, target, _target_params=None):
+ return cls(_target=target, _target_params=_target_params)
+ create_wildcard = classmethod(_create_wildcard)
+
+ def _create_predicate(cls, target, predicate, _target_params=None):
+ return cls(_target=target, _target_params=_target_params,
+ _predicate=predicate)
+ create_predicate = classmethod(_create_predicate)
+
+ def _create_id(cls, target, ident, _target_params=None):
+ return cls(_target=target, _target_params=_target_params, _id=ident)
+ create_id = classmethod(_create_id)
- if self._TARGET in qmap:
- self._target_map = qmap[self._TARGET]
- if self._PREDICATE in qmap:
- self.setPredicate(qmap[self._PREDICATE])
- return
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
+
+ def get_target(self):
+ return self._target
+
+ def get_target_param(self):
+ return self._target_params
+
+ def get_selector(self):
+ if self._id:
+ return QmfQuery.ID
else:
- # assume qmap to be the target map
- self._target_map = qmap[:]
+ return QmfQuery.PREDICATE
+ def get_id(self):
+ return self._id
- def setPredicate(self, predicate):
+ def get_predicate(self):
"""
"""
- if isinstance(predicate, QmfQueryPredicate):
- self._predicate = predicate
- elif type(predicate) == dict:
- self._predicate = QmfQueryPredicate(predicate)
- else:
- raise TypeError("Invalid type for a predicate: %s" % str(predicate))
-
+ return self._predicate
def evaluate(self, qmfData):
"""
"""
- # @todo: how to interpred qmfData against target??????
- #
+ if self._id:
+ if self._target == self.TARGET_SCHEMA:
+ return (qmfData.has_value(qmfData.KEY_SCHEMA_ID) and
+ qmfData.get_value(qmfData.KEY_SCHEMA_ID) == self._id)
+ elif self._target == self.TARGET_OBJECT:
+ return (qmfData.has_value(qmfData.KEY_OBJECT_ID) and
+ qmfData.get_value(qmfData.KEY_OBJECT_ID) == self._id)
+ elif self._target == self.TARGET_AGENT:
+ return (qmfData.has_value(self.KEY_AGENT_NAME) and
+ qmfData.get_value(self.KEY_AGENT_NAME) == self._id)
+
+ raise Exception("Unsupported query target '%s'" % str(self._target))
+
if self._predicate:
return self._predicate.evaluate(qmfData)
- # no predicate - always match
+ # no predicate and no id - always match
return True
- def getTarget(self):
- for key in self._target_map.iterkeys():
- if key in self._valid_targets:
- return key
- return None
-
- def getPredicate(self):
- return self._predicate
-
def map_encode(self):
- _map = {}
- _map[self._TARGET] = self._target_map
- if self._predicate is not None:
- _map[self._PREDICATE] = self._predicate.map_encode()
+ _map = {self.KEY_TARGET: {self._target: self._target_params}}
+ if self._id is not None:
+ if isinstance(self._id, _mapEncoder):
+ _map[self.KEY_ID] = self._id.map_encode()
+ else:
+ _map[self.KEY_ID] = self._id
+ elif self._predicate is not None:
+ _map[self.KEY_PREDICATE] = self._predicate.map_encode()
return _map
def __repr__(self):
@@ -901,11 +984,11 @@ class QmfQueryPredicate(_mapEncoder):
"""
Class for Query predicates.
"""
- _valid_cmp_ops = [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT,
- QmfQuery._CMP_GT, QmfQuery._CMP_LE, QmfQuery._CMP_GE,
- QmfQuery._CMP_EXISTS, QmfQuery._CMP_RE_MATCH,
- QmfQuery._CMP_TRUE, QmfQuery._CMP_FALSE]
- _valid_logic_ops = [QmfQuery._LOGIC_AND, QmfQuery._LOGIC_OR, QmfQuery._LOGIC_NOT]
+ _valid_cmp_ops = [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT,
+ QmfQuery.CMP_GT, QmfQuery.CMP_LE, QmfQuery.CMP_GE,
+ QmfQuery.CMP_EXISTS, QmfQuery.CMP_RE_MATCH,
+ QmfQuery.CMP_TRUE, QmfQuery.CMP_FALSE]
+ _valid_logic_ops = [QmfQuery.LOGIC_AND, QmfQuery.LOGIC_OR, QmfQuery.LOGIC_NOT]
def __init__( self, pmap):
@@ -919,7 +1002,7 @@ class QmfQueryPredicate(_mapEncoder):
if type(pmap) == dict:
for key in pmap.iterkeys():
if key in self._valid_cmp_ops:
- # coparison operation - may have "name" and "value"
+ # comparison operation - may have "name" and "value"
self._oper = key
break
if key in self._valid_logic_ops:
@@ -955,16 +1038,16 @@ class QmfQueryPredicate(_mapEncoder):
if not isinstance(qmfData, QmfData):
raise TypeError("Query expects to evaluate QmfData types.")
- if self._oper == QmfQuery._CMP_TRUE:
+ if self._oper == QmfQuery.CMP_TRUE:
logging.debug("query evaluate TRUE")
return True
- if self._oper == QmfQuery._CMP_FALSE:
+ if self._oper == QmfQuery.CMP_FALSE:
logging.debug("query evaluate FALSE")
return False
- if self._oper in [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT,
- QmfQuery._CMP_LE, QmfQuery._CMP_GT, QmfQuery._CMP_GE,
- QmfQuery._CMP_RE_MATCH]:
+ if self._oper in [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT,
+ QmfQuery.CMP_LE, QmfQuery.CMP_GT, QmfQuery.CMP_GE,
+ QmfQuery.CMP_RE_MATCH]:
if len(self._operands) != 2:
logging.warning("Malformed query compare expression received: '%s, %s'" %
(self._oper, str(self._operands)))
@@ -982,13 +1065,13 @@ class QmfQueryPredicate(_mapEncoder):
logging.debug("query evaluate %s: '%s' '%s' '%s'" %
(name, str(arg1), self._oper, str(arg2)))
try:
- if self._oper == QmfQuery._CMP_EQ: return arg1 == arg2
- if self._oper == QmfQuery._CMP_NE: return arg1 != arg2
- if self._oper == QmfQuery._CMP_LT: return arg1 < arg2
- if self._oper == QmfQuery._CMP_LE: return arg1 <= arg2
- if self._oper == QmfQuery._CMP_GT: return arg1 > arg2
- if self._oper == QmfQuery._CMP_GE: return arg1 >= arg2
- if self._oper == QmfQuery._CMP_RE_MATCH:
+ if self._oper == QmfQuery.CMP_EQ: return arg1 == arg2
+ if self._oper == QmfQuery.CMP_NE: return arg1 != arg2
+ if self._oper == QmfQuery.CMP_LT: return arg1 < arg2
+ if self._oper == QmfQuery.CMP_LE: return arg1 <= arg2
+ if self._oper == QmfQuery.CMP_GT: return arg1 > arg2
+ if self._oper == QmfQuery.CMP_GE: return arg1 >= arg2
+ if self._oper == QmfQuery.CMP_RE_MATCH:
logging.error("!!! RE QUERY TBD !!!")
return False
except:
@@ -998,7 +1081,7 @@ class QmfQueryPredicate(_mapEncoder):
return False
- if self._oper == QmfQuery._CMP_EXISTS:
+ if self._oper == QmfQuery.CMP_EXISTS:
if len(self._operands) != 1:
logging.warning("Malformed query present expression received")
return False
@@ -1006,21 +1089,21 @@ class QmfQueryPredicate(_mapEncoder):
logging.debug("query evaluate PRESENT: [%s]" % str(name))
return qmfData.has_value(name)
- if self._oper == QmfQuery._LOGIC_AND:
+ if self._oper == QmfQuery.LOGIC_AND:
logging.debug("query evaluate AND: '%s'" % str(self._operands))
for exp in self._operands:
if not exp.evaluate(qmfData):
return False
return True
- if self._oper == QmfQuery._LOGIC_OR:
+ if self._oper == QmfQuery.LOGIC_OR:
logging.debug("query evaluate OR: [%s]" % str(self._operands))
for exp in self._operands:
if exp.evaluate(qmfData):
return True
return False
- if self._oper == QmfQuery._LOGIC_NOT:
+ if self._oper == QmfQuery.LOGIC_NOT:
logging.debug("query evaluate NOT: [%s]" % str(self._operands))
for exp in self._operands:
if exp.evaluate(qmfData):
@@ -1129,7 +1212,7 @@ class SchemaClassId(_mapEncoder):
KEY_HASH="_hash_str"
TYPE_DATA = "_data"
- TYPE_EVENT = "event"
+ TYPE_EVENT = "_event"
_valid_types=[TYPE_DATA, TYPE_EVENT]
_schemaHashStrFormat = "%08x-%08x-%08x-%08x"
@@ -1333,7 +1416,7 @@ class SchemaProperty(_mapEncoder):
def getAccess(self): return self._access
- def isOptional(self): return self._isOptional
+ def is_optional(self): return self._isOptional
def isIndex(self): return self._isIndex
@@ -1351,9 +1434,9 @@ class SchemaProperty(_mapEncoder):
def isParentRef(self): return self._isParentRef
- def getDirection(self): return self._dir
+ def get_direction(self): return self._dir
- def getDefault(self): return self._default
+ def get_default(self): return self._default
def map_encode(self):
"""
@@ -1402,7 +1485,11 @@ class SchemaMethod(_mapEncoder):
map["arguments"] = map of "name"=<SchemaProperty> pairs.
map["desc"] = str, description of the method
"""
- def __init__(self, args={}, _desc=None, _map=None):
+ KEY_NAME="_name"
+ KEY_ARGUMENTS="_arguments"
+ KEY_DESC="_desc"
+ KEY_ERROR="_error"
+ def __init__(self, _args={}, _desc=None, _map=None):
"""
Construct a SchemaMethod.
@@ -1412,14 +1499,16 @@ class SchemaMethod(_mapEncoder):
@param _desc: Human-readable description of the schema
"""
if _map is not None:
- _desc = _map.get("desc")
- margs = _map.get("arguments", args)
- # margs are in map format - covert to SchemaProperty
- args = {}
- for name,val in margs.iteritems():
- args[name] = SchemaProperty.from_map(val)
-
- self._arguments = args.copy()
+ _desc = _map.get(self.KEY_DESC)
+ margs = _map.get(self.KEY_ARGUMENTS)
+ if margs:
+ # margs are in map format - covert to SchemaProperty
+ tmp_args = {}
+ for name,val in margs.iteritems():
+ tmp_args[name] = SchemaProperty.from_map(val)
+ _args=tmp_args
+
+ self._arguments = _args.copy()
self._desc = _desc
# map constructor
@@ -1427,13 +1516,13 @@ class SchemaMethod(_mapEncoder):
return cls(_map=map_)
from_map = classmethod(_from_map)
- def getDesc(self): return self._desc
+ def get_desc(self): return self._desc
- def getArgCount(self): return len(self._arguments)
+ def get_arg_count(self): return len(self._arguments)
- def getArguments(self): return self._arguments.copy()
+ def get_arguments(self): return self._arguments.copy()
- def getArgument(self, name): return self._arguments[name]
+ def get_argument(self, name): return self._arguments.get(name)
def add_argument(self, name, schema):
"""
@@ -1447,6 +1536,9 @@ class SchemaMethod(_mapEncoder):
"""
if not isinstance(schema, SchemaProperty):
raise TypeError("argument must be a SchemaProperty class")
+ # "Input" argument, by default
+ if schema._dir is None:
+ schema._dir = "I"
self._arguments[name] = schema
def map_encode(self):
@@ -1457,8 +1549,8 @@ class SchemaMethod(_mapEncoder):
_args = {}
for name,val in self._arguments.iteritems():
_args[name] = val.map_encode()
- _map["arguments"] = _args
- if self._desc: _map["desc"] = self._desc
+ _map[self.KEY_ARGUMENTS] = _args
+ if self._desc: _map[self.KEY_DESC] = self._desc
return _map
def __repr__(self):
@@ -1493,7 +1585,6 @@ class SchemaClass(QmfData):
map["_schema_id"] = map representation of a SchemaClassId instance
map["_primary_key_names"] = order list of primary key names
"""
- KEY_SCHEMA_ID="_schema_id"
KEY_PRIMARY_KEY_NAMES="_primary_key_names"
KEY_DESC = "_desc"
@@ -1671,6 +1762,11 @@ class SchemaObjectClass(SchemaClass):
if self._classId.get_type() != SchemaClassId.TYPE_DATA:
raise TypeError("Invalid ClassId type for data schema: %s" % self._classId)
+ # map constructor
+ def __from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(__from_map)
+
def get_id_names(self):
return self._object_id_names[:]
@@ -1725,6 +1821,11 @@ class SchemaEventClass(SchemaClass):
raise TypeError("Invalid ClassId type for event schema: %s" %
self._classId)
+ # map constructor
+ def __from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(__from_map)
+
diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py
index 027ce163e5..97acdd767e 100644
--- a/qpid/python/qmf/qmfConsole.py
+++ b/qpid/python/qmf/qmfConsole.py
@@ -28,16 +28,18 @@ from threading import Lock
from threading import currentThread
from threading import Condition
-from qpid.messaging import *
+from qpid.messaging import Connection, Message, Empty, SendError
from qmfCommon import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
- AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION)
+ AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+ SchemaClass, SchemaClassId, SchemaEventClass,
+ SchemaObjectClass, WorkItem, SchemaMethod)
# global flag that indicates which thread (if any) is
-# running the console callback
+# running the console notifier callback
_callback_thread=None
@@ -243,13 +245,86 @@ class QmfConsoleData(QmfData):
logging.error(" TBD!!!")
return None
- def invoke_method(self, name, _in_args=None, _reply_handle=None,
+ def invoke_method(self, name, _in_args={}, _reply_handle=None,
_timeout=None):
"""
invoke the named method.
"""
- logging.error(" TBD!!!")
- return None
+ assert self._agent
+ assert self._agent._console
+
+ oid = self.get_object_id()
+ if oid is None:
+ raise ValueError("Cannot invoke methods on unmanaged objects.")
+
+ if _timeout is None:
+ _timeout = self._agent._console._reply_timeout
+
+ if _in_args:
+ _in_args = _in_args.copy()
+
+ if self._schema:
+ # validate
+ ms = self._schema.get_method(name)
+ if ms is None:
+ raise ValueError("Method '%s' is undefined." % ms)
+
+ for aname,prop in ms.get_arguments().iteritems():
+ if aname not in _in_args:
+ if prop.get_default():
+ _in_args[aname] = prop.get_default()
+ elif not prop.is_optional():
+ raise ValueError("Method '%s' requires argument '%s'"
+ % (name, aname))
+ for aname in _in_args.iterkeys():
+ prop = ms.get_argument(aname)
+ if prop is None:
+ raise ValueError("Method '%s' does not define argument"
+ " '%s'" % (name, aname))
+ if "I" not in prop.get_direction():
+ raise ValueError("Method '%s' argument '%s' is not an"
+ " input." % (name, aname))
+
+ # @todo check if value is correct (type, range, etc)
+
+ handle = self._agent._console._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+
+ _map = {self.KEY_OBJECT_ID:str(oid),
+ SchemaMethod.KEY_NAME:name}
+ if _in_args:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+ logging.debug("Sending method req to Agent (%s)" % time.time())
+ try:
+ self._agent._sendMethodReq(_map, handle)
+ except SendError, e:
+ logging.error(str(e))
+ self._agent._console._req_correlation.release(handle)
+ return None
+
+ # @todo async method calls!!!
+ if _reply_handle is not None:
+ print("ASYNC TBD")
+
+ logging.debug("Waiting for response to method req (%s)" % _timeout)
+ replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout)
+ self._agent._console._req_correlation.release(handle)
+ if not replyMsg:
+ logging.debug("Agent method req wait timed-out.")
+ return None
+
+ _map = replyMsg.content.get(MsgKey.method)
+ if not _map:
+ logging.error("Invalid method call reply message")
+ return None
+
+ error=_map.get(SchemaMethod.KEY_ERROR)
+ if error:
+ return MethodResult(_error=QmfData.from_map(error))
+ else:
+ return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
@@ -364,6 +439,54 @@ class Agent(object):
"""
pass
+
+ def invoke_method(self, name, _in_args={}, _reply_handle=None,
+ _timeout=None):
+ """
+ """
+ assert self._console
+
+ if _timeout is None:
+ _timeout = self._console._reply_timeout
+
+ if _in_args:
+ _in_args = _in_args.copy()
+
+ handle = self._console._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+
+ _map = {SchemaMethod.KEY_NAME:name}
+ if _in_args:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+ logging.debug("Sending method req to Agent (%s)" % time.time())
+ try:
+ self._sendMethodReq(_map, handle)
+ except SendError, e:
+ logging.error(str(e))
+ self._console._req_correlation.release(handle)
+ return None
+
+ # @todo async method calls!!!
+ if _reply_handle is not None:
+ print("ASYNC TBD")
+
+ logging.debug("Waiting for response to method req (%s)" % _timeout)
+ replyMsg = self._console._req_correlation.get_data(handle, _timeout)
+ self._console._req_correlation.release(handle)
+ if not replyMsg:
+ logging.debug("Agent method req wait timed-out.")
+ return None
+
+ _map = replyMsg.content.get(MsgKey.method)
+ if not _map:
+ logging.error("Invalid method call reply message")
+ return None
+
+ return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
+ _error=_map.get(SchemaMethod.KEY_ERROR))
+
def __repr__(self):
return str(self._address)
@@ -379,45 +502,46 @@ class Agent(object):
self._sendMsg( msg, correlation_id )
+ def _sendMethodReq(self, mr_map, correlation_id=None):
+ """
+ """
+ msg = Message(subject=makeSubject(OpCode.method_req),
+ properties={"method":"request"},
+ content=mr_map)
+ self._sendMsg( msg, correlation_id )
+
+
##==============================================================================
- ## CONSOLE
+ ## METHOD CALL
##==============================================================================
+class MethodResult(object):
+ def __init__(self, _out_args=None, _error=None):
+ self._error = _error
+ self._out_args = _out_args
+ def succeeded(self):
+ return self._error is None
-class WorkItem(object):
- """
- Describes an event that has arrived at the Console for the
- application to process. The Notifier is invoked when one or
- more of these WorkItems become available for processing.
- """
- #
- # Enumeration of the types of WorkItems produced by the Console
- #
- AGENT_ADDED = 1
- AGENT_DELETED = 2
- NEW_PACKAGE = 3
- NEW_CLASS = 4
- OBJECT_UPDATE = 5
- EVENT_RECEIVED = 7
- AGENT_HEARTBEAT = 8
+ def get_exception(self):
+ return self._error
- def __init__(self, kind, kwargs={}):
- """
- Used by the Console to create a work item.
-
- @type kind: int
- @param kind: work item type
- """
- self._kind = kind
- self._param_map = kwargs
+ def get_arguments(self):
+ return self._out_args
+
+ def get_argument(self, name):
+ arg = None
+ if self._out_args:
+ arg = self._out_args.get(name)
+ return arg
+
+
+ ##==============================================================================
+ ## CONSOLE
+ ##==============================================================================
- def getType(self):
- return self._kind
- def getParams(self):
- return self._param_map
@@ -465,6 +589,7 @@ class Console(Thread):
self._cv = Condition()
# for passing WorkItems to the application
self._work_q = Queue.Queue()
+ self._work_q_put = False
## Old stuff below???
#self._broker_list = []
#self.impl = qmfengine.Console()
@@ -512,14 +637,15 @@ class Console(Thread):
" x-properties:"
" {type:direct}}}",
capacity=1)
+ logging.error("local addr=%s" % self._address)
ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- logging.debug("agent.ind addr=%s" % ind_addr)
+ logging.error("agent.ind addr=%s" % ind_addr)
self._announce_recvr = self._session.receiver(str(ind_addr) +
";{create:always,"
" node-properties:{type:topic}}",
capacity=1)
locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- logging.debug("agent.locate addr=%s" % locate_addr)
+ logging.error("agent.locate addr=%s" % locate_addr)
self._locate_sender = self._session.sender(str(locate_addr) +
";{create:always,"
" node-properties:{type:topic}}")
@@ -615,9 +741,7 @@ class Console(Thread):
" x-properties:"
" {type:direct}}}")
- query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
- QmfQuery._PREDICATE:
- {QmfQuery._CMP_EQ: ["_name", name]}})
+ query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
msg = Message(subject=makeSubject(OpCode.agent_locate),
properties={"method":"request"},
content={MsgKey.query: query.map_encode()})
@@ -630,7 +754,7 @@ class Console(Thread):
self._req_correlation.release(handle)
return None
- if not timeout:
+ if timeout is None:
timeout = self._reply_timeout
new_agent = None
@@ -650,6 +774,7 @@ class Console(Thread):
"""
"""
+ target = query.get_target()
handle = self._req_correlation.allocate()
if handle == 0:
raise Exception("Can not allocate a correlation id!")
@@ -667,15 +792,63 @@ class Console(Thread):
logging.debug("Waiting for response to Query (%s)" % timeout)
reply = self._req_correlation.get_data(handle, timeout)
self._req_correlation.release(handle)
- logging.debug("Agent Query wait ended (%s)" % time.time())
- if reply:
- print("Agent Query Reply='%s'" % reply)
- return reply.content
- else:
- print("Agent Query FAILED!!!")
+ if not reply:
+ logging.debug("Agent Query wait timed-out.")
return None
-
+ if target == QmfQuery.TARGET_PACKAGES:
+ # simply pass back the list of package names
+ logging.debug("Response to Packet Query received")
+ return reply.content.get(MsgKey.package_info)
+ elif target == QmfQuery.TARGET_OBJECT_ID:
+ # simply pass back the list of object_id's
+ logging.debug("Response to Object Id Query received")
+ return reply.content.get(MsgKey.object_id)
+ elif target == QmfQuery.TARGET_SCHEMA_ID:
+ logging.debug("Response to Schema Id Query received")
+ id_list = []
+ for sid_map in reply.content.get(MsgKey.schema_id):
+ id_list.append(SchemaClassId.from_map(sid_map))
+ return id_list
+ elif target == QmfQuery.TARGET_SCHEMA:
+ logging.debug("Response to Schema Query received")
+ schema_list = []
+ for schema_map in reply.content.get(MsgKey.schema):
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ schema_list.append(schema)
+ self._add_schema(schema)
+ return schema_list
+ elif target == QmfQuery.TARGET_OBJECT:
+ logging.debug("Response to Object Query received")
+ obj_list = []
+ for obj_map in reply.content.get(MsgKey.data_obj):
+ # if the object references a schema, fetch it
+ sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ schema = self._fetch_schema(sid, _agent=agent,
+ _timeout=timeout)
+ if not schema:
+ logging.warning("Unknown schema, id=%s" % sid)
+ continue
+ obj = QmfConsoleData(map_=obj_map, agent=agent,
+ _schema=schema)
+ else:
+ # no schema needed
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ obj_list.append(obj)
+ return obj_list
+ else:
+ logging.warning("Unexpected Target for a Query: '%s'" % target)
+ return None
def run(self):
global _callback_thread
@@ -684,30 +857,30 @@ class Console(Thread):
#
while self._operational:
- qLen = self._work_q.qsize()
+ # qLen = self._work_q.qsize()
- try:
- msg = self._announce_recvr.fetch(timeout = 0)
- if msg:
- self._dispatch(msg, _direct=False)
- except Empty:
- pass
+ while True:
+ try:
+ msg = self._announce_recvr.fetch(timeout=0)
+ except Empty:
+ break
+ self._dispatch(msg, _direct=False)
- try:
- msg = self._direct_recvr.fetch(timeout = 0)
- if msg:
- self._dispatch(msg, _direct=True)
- except Empty:
- pass
+ while True:
+ try:
+ msg = self._direct_recvr.fetch(timeout = 0)
+ except Empty:
+ break
+ self._dispatch(msg, _direct=True)
self._expireAgents() # check for expired agents
- if qLen == 0 and self._work_q.qsize() and self._notifier:
- # work queue went non-empty, kick
- # the application...
-
+ #if qLen == 0 and self._work_q.qsize() and self._notifier:
+ if self._work_q_put and self._notifier:
+ # new stuff on work queue, kick the the application...
+ self._work_q_put = False
_callback_thread = currentThread()
- logging.info("Calling console indication")
+ logging.info("Calling console notifier.indication")
self._notifier.indication()
_callback_thread = None
@@ -761,7 +934,7 @@ class Console(Thread):
elif opcode == OpCode.object_ind:
logging.warning("!!! object_ind TBD !!!")
elif opcode == OpCode.response:
- logging.warning("!!! response TBD !!!")
+ self._handleResponseMsg(msg, cmap, version, _direct)
elif opcode == OpCode.schema_ind:
logging.warning("!!! schema_ind TBD !!!")
elif opcode == OpCode.noop:
@@ -777,26 +950,29 @@ class Console(Thread):
"""
logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time()))
- if MsgKey.agent_info in cmap:
- try:
- # TODO: fix
- name = cmap[MsgKey.agent_info]["_name"]
- except:
- logging.warning("Bad agent-ind message received: '%s'" % msg)
- return
+ ai_map = cmap.get(MsgKey.agent_info)
+ if not ai_map or not isinstance(ai_map, type({})):
+ logging.warning("Bad agent-ind message received: '%s'" % msg)
+ return
+ name = ai_map.get("_name")
+ if not name:
+ logging.warning("Bad agent-ind message received: agent name missing"
+ " '%s'" % msg)
+ return
ignore = True
matched = False
correlated = False
+ agent_query = self._agent_discovery_filter
+
if msg.correlation_id:
correlated = self._req_correlation.isValid(msg.correlation_id)
+
if direct and correlated:
ignore = False
- elif self._agent_discovery_filter:
- logging.error("FIXME: agent discovery filter - new agent name style")
- # matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode()))
- # ignore = not matched
- matched = True; ignore = False # for now
+ elif agent_query:
+ matched = agent_query.evaluate(QmfData.create(values=ai_map))
+ ignore = not matched
if not ignore:
agent = None
@@ -820,9 +996,9 @@ class Console(Thread):
if old_timestamp == None and matched:
logging.error("AGENT_ADDED for %s (%s)" % (agent, time.time()))
- wi = WorkItem(WorkItem.AGENT_ADDED,
- {"agent": agent})
+ wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
self._work_q.put(wi)
+ self._work_q_put = True
if correlated:
# wake up all waiters
@@ -847,6 +1023,22 @@ class Console(Thread):
self._req_correlation.put_data(msg.correlation_id, msg)
+ def _handleResponseMsg(self, msg, cmap, version, direct):
+ """
+ Process a received data-ind message.
+ """
+ # @todo code replication - clean me.
+ logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time()))
+
+ if not self._req_correlation.isValid(msg.correlation_id):
+ logging.error("FIXME: uncorrelated response??? msg='%s'" % str(msg))
+ return
+
+ # wake up all waiters
+ logging.error("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
+
+
def _expireAgents(self):
"""
Check for expired agents and issue notifications when they expire.
@@ -865,8 +1057,10 @@ class Console(Thread):
if agent_deathtime <= now:
logging.debug("AGENT_DELETED for %s" % agent)
agent._announce_timestamp = None
- wi = WorkItem(WorkItem.AGENT_DELETED, {"agent":agent})
+ wi = WorkItem(WorkItem.AGENT_DELETED, None,
+ {"agent":agent})
self._work_q.put(wi)
+ self._work_q_put = True
else:
if (agent_deathtime - now) < next_expire_delta:
next_expire_delta = agent_deathtime - now
@@ -903,28 +1097,29 @@ class Console(Thread):
# new agent - query for its schema database for
# seeding the schema cache (@todo)
- # query = QmfQuery({QmfQuery._TARGET_SCHEMA_ID:None})
+ # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None})
# agent._sendQuery( query )
return agent
- def enableAgentDiscovery(self, query=None):
+ def enable_agent_discovery(self, _query=None):
"""
Called to enable the asynchronous Agent Discovery process.
Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
"""
- if query:
- if not isinstance(query, QmfQuery):
- raise TypeError("Type QmfQuery expected")
- self._agent_discovery_filter = query
+ # @todo: fix - take predicate only, not entire query!
+ if _query is not None:
+ if (not isinstance(_query, QmfQuery) or
+ _query.get_target() != QmfQuery.TARGET_AGENT):
+ raise TypeError("Type QmfQuery with target == TARGET_AGENT expected")
+ self._agent_discovery_filter = _query
else:
# create a match-all agent query (no predicate)
- self._agent_discovery_filter = QmfQuery({QmfQuery._TARGET:
- {QmfQuery._TARGET_AGENT:None}})
+ self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT)
- def disableAgentDiscovery(self):
+ def disable_agent_discovery(self):
"""
Called to disable the async Agent Discovery process enabled by
calling enableAgentDiscovery()
@@ -933,7 +1128,7 @@ class Console(Thread):
- def getWorkItemCount(self):
+ def get_workitem_count(self):
"""
Returns the count of pending WorkItems that can be retrieved.
"""
@@ -941,19 +1136,19 @@ class Console(Thread):
- def getNextWorkItem(self, timeout=None):
+ def get_next_workitem(self, timeout=None):
"""
Returns the next pending work item, or None if none available.
@todo: subclass and return an Empty event instead.
"""
try:
wi = self._work_q.get(True, timeout)
- except:
+ except Queue.Empty:
return None
return wi
- def releaseWorkItem(self, wi):
+ def release_workitem(self, wi):
"""
Return a WorkItem to the Console when it is no longer needed.
@todo: call Queue.task_done() - only 2.5+
@@ -963,6 +1158,50 @@ class Console(Thread):
"""
pass
+ def _add_schema(self, schema):
+ """
+ @todo
+ """
+ if not isinstance(schema, SchemaClass):
+ raise TypeError("SchemaClass type expected")
+
+ self._lock.acquire()
+ try:
+ sid = schema.get_class_id()
+ if not self._schema_cache.has_key(sid):
+ self._schema_cache[sid] = schema
+ finally:
+ self._lock.release()
+
+ def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
+ """
+ Find the schema identified by schema_id. If not in the cache, ask the
+ agent for it.
+ """
+ if not isinstance(schema_id, SchemaClassId):
+ raise TypeError("SchemaClassId type expected")
+
+ self._lock.acquire()
+ try:
+ schema = self._schema_cache.get(schema_id)
+ if schema:
+ return schema
+ finally:
+ self._lock.release()
+
+ if _agent is None:
+ return None
+
+ # note: doQuery will add the new schema to the cache automatically.
+ slist = self.doQuery(_agent,
+ QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
+ _timeout)
+ if slist:
+ return slist[0]
+ else:
+ return None
+
+
# def get_packages(self):
# plist = []
@@ -1341,9 +1580,7 @@ class Console(Thread):
if __name__ == '__main__':
# temp test code
- from qmfCommon import (qmfTypes, QmfData,
- QmfEvent, SchemaClassId, SchemaEventClass,
- SchemaProperty, SchemaObjectClass)
+ from qmfCommon import (qmfTypes, QmfEvent, SchemaProperty)
logging.getLogger().setLevel(logging.INFO)
@@ -1362,7 +1599,7 @@ if __name__ == '__main__':
_myConsole = Console(notifier=_noteMe)
- _myConsole.enableAgentDiscovery()
+ _myConsole.enable_agent_discovery()
logging.info("Waiting...")
@@ -1430,16 +1667,16 @@ if __name__ == '__main__':
"statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
"unit": "seconds",
"desc": "time until I retire"},
- "meth1": {"desc": "A test method",
- "arguments":
+ "meth1": {"_desc": "A test method",
+ "_arguments":
{"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
"desc": "an argument 1",
"dir": "I"},
"arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
"dir": "IO",
"desc": "some weird boolean"}}},
- "meth2": {"desc": "A test method",
- "arguments":
+ "meth2": {"_desc": "A test method",
+ "_arguments":
{"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
"desc": "an 'nuther argument",
"dir":
@@ -1538,15 +1775,14 @@ if __name__ == '__main__':
logging.info( "******** Messing around with Queries ********" )
- _q1 = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
- QmfQuery._PREDICATE:
- {QmfQuery._LOGIC_AND:
- [{QmfQuery._CMP_EQ: ["vendor", "AVendor"]},
- {QmfQuery._CMP_EQ: ["product", "SomeProduct"]},
- {QmfQuery._CMP_EQ: ["name", "Thingy"]},
- {QmfQuery._LOGIC_OR:
- [{QmfQuery._CMP_LE: ["temperature", -10]},
- {QmfQuery._CMP_FALSE: None},
- {QmfQuery._CMP_EXISTS: ["namey"]}]}]}})
-
- print("_q1.mapEncode() = [%s]" % _q1)
+ _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
+ QmfQueryPredicate({QmfQuery.LOGIC_AND:
+ [{QmfQuery.CMP_EQ: ["vendor", "AVendor"]},
+ {QmfQuery.CMP_EQ: ["product", "SomeProduct"]},
+ {QmfQuery.CMP_EQ: ["name", "Thingy"]},
+ {QmfQuery.LOGIC_OR:
+ [{QmfQuery.CMP_LE: ["temperature", -10]},
+ {QmfQuery.CMP_FALSE: None},
+ {QmfQuery.CMP_EXISTS: ["namey"]}]}]}))
+
+ print("_q1.mapEncode() = [%s]" % _q1.map_encode())
diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py
index 9683127a6e..dd55b49316 100644
--- a/qpid/python/qmf/test/agent_test.py
+++ b/qpid/python/qmf/test/agent_test.py
@@ -5,7 +5,8 @@ from threading import Semaphore
from qpid.messaging import *
from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
- QmfEvent, SchemaMethod, Notifier, SchemaClassId)
+ QmfEvent, SchemaMethod, Notifier, SchemaClassId,
+ WorkItem)
from qmfAgent import (Agent, QmfAgentData)
@@ -61,14 +62,14 @@ _agent.register_object_class(_schema)
# instantiate managed data objects matching the schema
-_obj = QmfAgentData( _agent, _schema=_schema )
-_obj.set_value("index1", 100)
-_obj.set_value("index2", "a name" )
-_obj.set_value("set_string", "UNSET")
-_obj.set_value("set_int", 0)
-_obj.set_value("query_count", 0)
-_obj.set_value("method_call_count", 0)
-_agent.add_object( _obj )
+_obj1 = QmfAgentData( _agent, _schema=_schema )
+_obj1.set_value("index1", 100)
+_obj1.set_value("index2", "a name" )
+_obj1.set_value("set_string", "UNSET")
+_obj1.set_value("set_int", 0)
+_obj1.set_value("query_count", 0)
+_obj1.set_value("method_call_count", 0)
+_agent.add_object( _obj1 )
_agent.add_object( QmfAgentData( _agent, _schema=_schema,
_values={"index1":99,
@@ -78,26 +79,52 @@ _agent.add_object( QmfAgentData( _agent, _schema=_schema,
"query_count": 0,
"method_call_count": 0} ))
+# add an "unstructured" object to the Agent
+_obj2 = QmfAgentData(_agent, _object_id="01545")
+_obj2.set_value("field1", "a value")
+_obj2.set_value("field2", 2)
+_obj2.set_value("field3", {"a":1, "map":2, "value":3})
+_obj2.set_value("field4", ["a", "list", "value"])
+_agent.add_object(_obj2)
+
+
## Now connect to the broker
_c = Connection("localhost")
_c.connect()
_agent.setConnection(_c)
+_error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."})
_done = False
while not _done:
- try:
- _notifier.waitForWork()
-
- _wi = _agent.getNextWorkItem(timeout=0)
- while _wi:
- print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
- _agent.releaseWorkItem(_wi)
- _wi = _agent.getNextWorkItem(timeout=0)
- except:
- print( "shutting down..." )
- _done = True
+ # try:
+ _notifier.waitForWork()
+
+ _wi = _agent.get_next_workitem(timeout=0)
+ while _wi:
+
+ if _wi.get_type() == WorkItem.METHOD_CALL:
+ mc = _wi.get_params()
+
+ if mc.get_name() == "set_meth":
+ print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id())
+ print("!!! args='%s'" % str(mc.get_args()))
+ print("!!! userid=%s" % str(mc.get_user_id()))
+ print("!!! handle=%s" % _wi.get_handle())
+ _agent.method_response(_wi.get_handle(),
+ {"rc1": 100, "rc2": "Success"})
+ else:
+ print("!!! Unknown Method name = %s" % mc.get_name())
+ _agent.method_response(_wi.get_handle(), _error=_error_data)
+ else:
+ print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params())))
+
+ _agent.release_workitem(_wi)
+ _wi = _agent.get_next_workitem(timeout=0)
+ # except:
+ # print( "shutting down...")
+ # _done = True
print( "Removing connection... TBD!!!" )
#_myConsole.remove_connection( _c, 10 )
diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py
index 6db515dc99..a7703e24a6 100644
--- a/qpid/python/qmf/test/console_test.py
+++ b/qpid/python/qmf/test/console_test.py
@@ -4,7 +4,8 @@ from threading import Semaphore
from qpid.messaging import *
-from qmfCommon import (Notifier, QmfQuery, MsgKey, SchemaClassId, SchemaClass)
+from qmfCommon import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey,
+ SchemaClassId, SchemaClass, QmfData)
from qmfConsole import Console
@@ -33,64 +34,117 @@ _notifier = ExampleNotifier()
_myConsole = Console(notifier=_notifier)
_myConsole.addConnection( _c )
-# Discover only agents from vendor "redhat.com" that
-# are a "qmf" product....
+# Allow discovery only for the agent named "qmf.testAgent"
# @todo: replace "manual" query construction with
# a formal class-based Query API
-_query = {QmfQuery._TARGET:
- {QmfQuery._TARGET_AGENT:None},
- QmfQuery._PREDICATE:
- {QmfQuery._CMP_EQ: ["_name", "qmf.testAgent"]}}
-_query = QmfQuery(_query)
-
-_myConsole.enableAgentDiscovery(_query)
+_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
+ QmfQueryPredicate({QmfQuery.CMP_EQ:
+ [QmfQuery.KEY_AGENT_NAME,
+ "qmf.testAgent"]}))
+_myConsole.enable_agent_discovery(_query)
_done = False
while not _done:
# try:
_notifier.waitForWork()
- _wi = _myConsole.getNextWorkItem(timeout=0)
+ _wi = _myConsole.get_next_workitem(timeout=0)
while _wi:
- print("!!! work item received %d:%s" % (_wi.getType(),
- str(_wi.getParams())))
+ print("!!! work item received %d:%s" % (_wi.get_type(),
+ str(_wi.get_params())))
- if _wi.getType() == _wi.AGENT_ADDED:
- _agent = _wi.getParams().get("agent")
+ if _wi.get_type() == _wi.AGENT_ADDED:
+ _agent = _wi.get_params().get("agent")
if not _agent:
print("!!!! AGENT IN REPLY IS NULL !!! ")
- _query = QmfQuery( {QmfQuery._TARGET:
- {QmfQuery._TARGET_PACKAGES:None}} )
-
- _reply = _myConsole.doQuery(_agent, _query)
-
- package_list = _reply.get(MsgKey.package_info)
- for pname in package_list:
- print("!!! Querying for schema from package: %s" % pname)
- _query = QmfQuery({QmfQuery._TARGET:
- {QmfQuery._TARGET_SCHEMA_ID:None},
- QmfQuery._PREDICATE:
- {QmfQuery._CMP_EQ:
- [SchemaClassId.KEY_PACKAGE, pname]}})
-
- _reply = _myConsole.doQuery(_agent, _query)
-
- schema_id_list = _reply.get(MsgKey.schema_id)
- for sid_map in schema_id_list:
- _query = QmfQuery({QmfQuery._TARGET:
- {QmfQuery._TARGET_SCHEMA:None},
- QmfQuery._PREDICATE:
- {QmfQuery._CMP_EQ:
- [SchemaClass.KEY_SCHEMA_ID, sid_map]}})
-
- _reply = _myConsole.doQuery(_agent, _query)
-
-
-
- _myConsole.releaseWorkItem(_wi)
- _wi = _myConsole.getNextWorkItem(timeout=0)
+ _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
+ oid_list = _myConsole.doQuery(_agent, _query)
+
+ print("!!!************************** REPLY=%s" % oid_list)
+
+ for oid in oid_list:
+ _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
+ oid)
+ obj_list = _myConsole.doQuery(_agent, _query)
+
+ print("!!!************************** REPLY=%s" % obj_list)
+
+ if obj_list is None:
+ obj_list={}
+
+ for obj in obj_list:
+ resp = obj.invoke_method( "set_meth",
+ {"arg_int": -11,
+ "arg_str": "are we not goons?"},
+ None,
+ 3)
+ if resp is None:
+ print("!!!*** NO RESPONSE FROM METHOD????")
+ else:
+ print("!!! method succeeded()=%s" % resp.succeeded())
+ print("!!! method exception()=%s" % resp.get_exception())
+ print("!!! method get args() = %s" % resp.get_arguments())
+
+ if not obj.is_described():
+ resp = obj.invoke_method( "bad method",
+ {"arg_int": -11,
+ "arg_str": "are we not goons?"},
+ None,
+ 3)
+ if resp is None:
+ print("!!!*** NO RESPONSE FROM METHOD????")
+ else:
+ print("!!! method succeeded()=%s" % resp.succeeded())
+ print("!!! method exception()=%s" % resp.get_exception())
+ print("!!! method get args() = %s" % resp.get_arguments())
+
+
+ #---------------------------------
+ #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name")
+
+ #obj_list = _myConsole.doQuery(_agent, _query)
+
+ #---------------------------------
+
+ # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
+
+ # package_list = _myConsole.doQuery(_agent, _query)
+
+ # for pname in package_list:
+ # print("!!! Querying for schema from package: %s" % pname)
+ # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID,
+ # QmfQueryPredicate(
+ # {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]}))
+
+ # schema_id_list = _myConsole.doQuery(_agent, _query)
+ # for sid in schema_id_list:
+ # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+ # QmfQueryPredicate(
+ # {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID,
+ # sid.map_encode()]}))
+
+ # schema_list = _myConsole.doQuery(_agent, _query)
+ # for schema in schema_list:
+ # sid = schema.get_class_id()
+ # _query = QmfQuery.create_predicate(
+ # QmfQuery.TARGET_OBJECT_ID,
+ # QmfQueryPredicate({QmfQuery.CMP_EQ:
+ # [QmfData.KEY_SCHEMA_ID,
+ # sid.map_encode()]}))
+
+ # oid_list = _myConsole.doQuery(_agent, _query)
+ # for oid in oid_list:
+ # _query = QmfQuery.create_id(
+ # QmfQuery.TARGET_OBJECT, oid)
+ # _reply = _myConsole.doQuery(_agent, _query)
+
+ # print("!!!************************** REPLY=%s" % _reply)
+
+
+ _myConsole.release_workitem(_wi)
+ _wi = _myConsole.get_next_workitem(timeout=0)
# except:
# logging.info( "shutting down..." )
# _done = True