summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-12-22 21:46:01 +0000
committerTed Ross <tross@apache.org>2009-12-22 21:46:01 +0000
commit1b4a13aa9781c3f72b2b53abfbce8528de67ad16 (patch)
treeedf46890480bfa367eaa1714e88d4ab0059aa1b2
parent6bd740528732eb8da6f521be49999a37eb5f44ee (diff)
downloadqpid-python-1b4a13aa9781c3f72b2b53abfbce8528de67ad16.tar.gz
QPID-2261 - Ken G's patch applied to the qmfv2 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@893326 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qmf/qmfAgent.py183
-rw-r--r--qpid/python/qmf/qmfCommon.py532
-rw-r--r--qpid/python/qmf/qmfConsole.py282
-rw-r--r--qpid/python/qmf/test/agent_test.py33
-rw-r--r--qpid/python/qmf/test/console_test.py18
5 files changed, 740 insertions, 308 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py
index f211cdb383..4b76263903 100644
--- a/qpid/python/qmf/qmfAgent.py
+++ b/qpid/python/qmf/qmfAgent.py
@@ -18,15 +18,16 @@
#
import sys
-import socket
-import os
import logging
+import datetime
+import time
from threading import Thread, Lock
-from qpid.messaging import Connection, Message
+from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE,
AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject,
- parseSubject, OpCode, Query, SchemaObjectClass, _doQuery)
+ parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey,
+ QmfData)
@@ -36,7 +37,8 @@ from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE,
class Agent(Thread):
def __init__(self, vendor, product, name=None,
- notifier=None, kwargs={}):
+ notifier=None, heartbeat_interval=30,
+ kwargs={}):
Thread.__init__(self)
self._running = False
self.vendor = vendor
@@ -48,10 +50,13 @@ class Agent(Thread):
self._id = AgentId(self.vendor, self.product, self.name)
self._address = str(self._id)
self._notifier = notifier
+ self._heartbeat_interval = heartbeat_interval
self._conn = None
+ self._session = None
self._lock = Lock()
- self._data_schema = {}
- self._event_schema = {}
+ self._packages = {}
+ self._schema_timestamp = long(0)
+ self._schema = {}
self._agent_data = {}
def getAgentId(self):
@@ -60,8 +65,9 @@ class Agent(Thread):
def setConnection(self, conn):
self._conn = conn
self._session = self._conn.session()
- self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE)
- self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address)
+ self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE, capacity=10)
+ self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address,
+ capacity=10)
self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION)
self._running = True
self.start()
@@ -77,7 +83,16 @@ class Agent(Thread):
self._lock.acquire()
try:
- self._data_schema[schema.getClassId()] = schema
+ classId = schema.getClassId()
+ pname = classId.getPackageName()
+ cname = classId.getClassName()
+ if pname not in self._packages:
+ self._packages[pname] = [cname]
+ else:
+ if cname not in self._packages[pname]:
+ self._packages[pname].append(cname)
+ self._schema[classId] = schema
+ self._schema_timestamp = long(time.time() * 1000)
finally:
self._lock.release()
@@ -128,35 +143,41 @@ class Agent(Thread):
def run(self):
- count = 0 # @todo: hack
+ next_heartbeat = datetime.datetime.utcnow()
while self._running:
+
+ now = datetime.datetime.utcnow()
+ print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
+ if now >= next_heartbeat:
+ self._ind_sender.send(self._makeAgentIndMsg())
+ logging.debug("Agent Indication Sent")
+ next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+ timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds
+ print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
try:
- msg = self._locate_receiver.fetch(1)
- logging.debug("Agent Locate Rcvd: '%s'" % msg)
- if msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=False)
+ logging.error("waiting for next rcvr (timeout=%s)..." % timeout)
+ self._session.next_receiver(timeout = timeout)
+ except Empty:
+ pass
except KeyboardInterrupt:
break
- except:
+
+ try:
+ msg = self._locate_receiver.fetch(timeout = 0)
+ if msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=False)
+ except Empty:
pass
try:
- msg = self._direct_receiver.fetch(1)
- logging.debug("Agent Msg Rcvd: '%s'" % msg)
+ msg = self._direct_receiver.fetch(timeout = 0)
if msg.content_type == "amqp/map":
self._dispatch(msg, _direct=True)
- except KeyboardInterrupt:
- break
- except:
+ except Empty:
pass
- # @todo: actually implement the periodic agent-ind
- # message generation!
- count+= 1
- if count == 5:
- count = 0
- self._ind_sender.send(self._makeAgentIndMsg())
- logging.debug("Agent Indication Sent")
+
#
# Private:
@@ -166,11 +187,11 @@ class Agent(Thread):
"""
Create an agent indication message identifying this agent
"""
+ _map = self.getAgentId().mapEncode()
+ _map["schemaTimestamp"] = self._schema_timestamp
return Message( subject=makeSubject(OpCode.agent_ind),
properties={"method":"response"},
- content={Query._TARGET_AGENT_ID:
- self.getAgentId().mapEncode()})
-
+ content={MsgKey.agent_info: _map})
def _dispatch(self, msg, _direct=False):
@@ -195,7 +216,7 @@ class Agent(Thread):
if opcode == OpCode.agent_locate:
self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.get_query:
- logging.warning("!!! GET_QUERY TBD !!!")
+ self._handleQueryMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.method_req:
logging.warning("!!! METHOD_REQ TBD !!!")
elif opcode == OpCode.cancel_subscription:
@@ -220,17 +241,9 @@ class Agent(Thread):
reply = True
if "method" in props and props["method"] == "request":
- if "query" in cmap:
- query = cmap["query"]
- # is the query an agent locate?
- if Query._TARGET in query and query[Query._TARGET] == {Query._TARGET_AGENT_ID:None}:
- if Query._PREDICATE in query:
- # does this agent match the predicate?
- reply = _doQuery( query[Query._PREDICATE], self.getAgentId().mapEncode() )
- else:
- reply = False
- logging.debug("Ignoring query - not an agent-id query: '%s'" % query)
- reply=True
+ if MsgKey.query in cmap:
+ agentIdMap = self.getAgentId().mapEncode()
+ reply = QmfQuery(cmap[MsgKey.query]).evaluate(QmfData(agentIdMap))
if reply:
try:
@@ -239,12 +252,92 @@ class Agent(Thread):
m.correlation_id = msg.correlation_id
tmp_snd.send(m)
logging.debug("agent-ind sent to [%s]" % msg.reply_to)
- except:
- logging.error("Failed to send reply to agent-ind msg '%s'" % msg)
+ except SendError, e:
+ logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e)))
else:
logging.debug("agent-locate msg not mine - no reply sent")
+ def _handleQueryMsg(self, msg, cmap, props, version, _direct ):
+ """
+ Handle received query message
+ """
+ logging.debug("_handleQueryMsg")
+
+ if "method" in props and props["method"] == "request":
+ qmap = cmap.get(MsgKey.query)
+ if qmap:
+ query = QmfQuery(qmap)
+ target = query.getTarget()
+ if target == QmfQuery._TARGET_PACKAGES:
+ self._queryPackages( msg, query )
+ elif target == QmfQuery._TARGET_SCHEMA_ID:
+ self._querySchemaId( msg, query )
+ elif target == QmfQuery._TARGET_SCHEMA:
+ logging.warning("!!! Query TARGET=SCHEMA TBD !!!")
+ #self._querySchema( query.getPredicate(), _idOnly=False )
+ elif target == QmfQuery._TARGET_AGENT:
+ logging.warning("!!! Query TARGET=AGENT TBD !!!")
+ elif target == QmfQuery._TARGET_OBJECT_ID:
+ logging.warning("!!! Query TARGET=OBJECT_ID TBD !!!")
+ elif target == QmfQuery._TARGET_OBJECT:
+ logging.warning("!!! Query TARGET=OBJECT TBD !!!")
+ else:
+ logging.warning("Unrecognized query target: '%s'" % str(target))
+
+
+ def _queryPackages(self, msg, query):
+ """
+ Run a query against the list of known packages
+ """
+ pnames = []
+ self._lock.acquire()
+ try:
+ for name in self._packages.iterkeys():
+ if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:name})):
+ pnames.append(name)
+ finally:
+ self._lock.release()
+
+ try:
+ tmp_snd = self._session.sender( msg.reply_to )
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content={MsgKey.package_info: pnames} )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ tmp_snd.send(m)
+ logging.debug("package_info sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+ def _querySchemaId( self, msg, query ):
+ """
+ """
+ schemas = []
+ self._lock.acquire()
+ try:
+ for schemaId in self._schema.iterkeys():
+ if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:schemaId.getPackageName()})):
+ schemas.append(schemaId.mapEncode())
+ finally:
+ self._lock.release()
+
+ try:
+ tmp_snd = self._session.sender( msg.reply_to )
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content={MsgKey.schema_id: schemas} )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ tmp_snd.send(m)
+ logging.debug("schema_id sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+
##==============================================================================
## OBJECTS
diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py
index daa5cea2fd..6e79b1c3f4 100644
--- a/qpid/python/qmf/qmfCommon.py
+++ b/qpid/python/qmf/qmfCommon.py
@@ -16,9 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
-import sys
-import socket
-import os
import time
import logging
from threading import Lock
@@ -47,6 +44,13 @@ AMQP_QMF_SUBJECT = "qmf"
AMQP_QMF_VERSION = 4
AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
+class MsgKey(object):
+ agent_info = "agent_info"
+ query = "query"
+ package_info = "package_info"
+ schema_id = "schema_id"
+
+
class OpCode(object):
noop = "noop"
@@ -54,7 +58,7 @@ class OpCode(object):
agent_locate = "agent-locate"
cancel_subscription = "cancel-subscription"
create_subscription = "create-subscription"
- get_query = "get_query"
+ get_query = "get-query"
method_req = "method"
renew_subscription = "renew-subscription"
schema_query = "schema-query" # @todo: deprecate
@@ -316,6 +320,9 @@ class QmfData(object):
def getProperty(self, _name):
return self._properties[_name]
+ def hasProperty(self, _name):
+ return _name in self._properties
+
def setProperty(self, _name, _value):
if self._const:
raise Exception("cannot modify constant data object")
@@ -332,15 +339,11 @@ class QmfData(object):
# ignore private data members
if _name[0] == '_':
return super.__setattr__(self, _name, _value)
- # @todo: this is bad - what if the same name is used for a
- # property and statistic or argument?
if _name in self._properties:
return self.setProperty(_name, _value)
return super.__setattr__(self, _name, _value)
def __getattr__(self, _name):
- # @todo: this is bad - what if the same name is used for a
- # property and statistic or argument?
if _name in self._properties: return self.getProperty(_name)
raise AttributeError("no item named '%s' in this object" % _name)
@@ -453,6 +456,32 @@ class QmfDescribed(QmfData):
return result
+ def getProperty(self, name):
+ # meta-properties
+ if name == QmfQuery._PRED_PACKAGE:
+ return self._schemaId.getClassId().getPackageName()
+ if name == QmfQuery._PRED_CLASS:
+ return self._schemaId.getClassId().getClassName()
+ if name == QmfQuery._PRED_TYPE:
+ return self._schemaId.getClassId().getType()
+ if name == QmfQuery._PRED_HASH:
+ return self._schemaId.getClassId().getHashString()
+ if name == QmfQuery._PRED_SCHEMA_ID:
+ return self._schemaId.getClassId()
+ if name == QmfQuery._PRED_PRIMARY_KEY:
+ return self.getPrimaryKey()
+
+ return super(QmfDescribed, self).getProperty(name)
+
+
+ def hasProperty(self, name):
+ if name in [QmfQuery._PRED_PACKAGE, QmfQuery._PRED_CLASS, QmfQuery._PRED_TYPE,
+ QmfQuery._PRED_HASH, QmfQuery._PRED_SCHEMA_ID, QmfQuery._PRED_PRIMARY_KEY]:
+ return True
+
+ return super(QmfDescribed, self).hasProperty(name)
+
+
def mapEncode(self):
_map = {}
_map["schema_id"] = self._schemaId.mapEncode()
@@ -553,6 +582,29 @@ class QmfManaged(QmfDescribed):
def isDeleted(self):
return self._timestamps[QmfManaged._ts_delete] == 0
+
+ def getProperty(self, name):
+ # meta-properties
+ if name == QmfQuery._PRED_OBJECT_ID:
+ return self.getObjectId()
+ if name == QmfQuery._PRED_UPDATE_TS:
+ return self._timestamps[QmfManaged._ts_update]
+ if name == QmfQuery._PRED_CREATE_TS:
+ return self._timestamps[QmfManaged._ts_create]
+ if name == QmfQuery._PRED_DELETE_TS:
+ return self._timestamps[QmfManaged._ts_delete]
+
+ return super(QmfManaged, self).getProperty(name)
+
+
+ def hasProperty(self, name):
+ if name in [QmfQuery._PRED_OBJECT_ID, QmfQuery._PRED_UPDATE_TS,
+ QmfQuery._PRED_CREATE_TS, QmfQuery._PRED_DELETE_TS]:
+ return True
+
+ return super(QmfManaged, self).hasProperty(name)
+
+
def mapEncode(self):
_map = super(QmfManaged, self).mapEncode()
_map["agent_id"] = self._agentId.mapEncode()
@@ -999,110 +1051,110 @@ class MethodResponse(object):
-def _doQuery(predicate, params ):
- """
- Given the predicate from a query, and a map of named parameters, apply the predicate
- to the parameters, and return True or False.
- """
- if type(predicate) != list or len(predicate) < 1:
- return False
+# def _doQuery(predicate, params ):
+# """
+# Given the predicate from a query, and a map of named parameters, apply the predicate
+# to the parameters, and return True or False.
+# """
+# if type(predicate) != list or len(predicate) < 1:
+# return False
- opr = predicate[0]
- if opr == Query._CMP_TRUE:
- logging.info("_doQuery() TRUE: [%s]" % predicate )
- return True
- elif opr == Query._CMP_FALSE:
- logging.info("_doQuery() FALSE: [%s]" % predicate )
- return False
- elif opr == Query._LOGIC_AND:
- logging.info("_doQuery() AND: [%s]" % predicate )
- rc = False
- for exp in predicate[1:]:
- rc = _doQuery( exp, params )
- if not rc:
- break
- return rc
-
- elif opr == Query._LOGIC_OR:
- logging.info("_doQuery() OR: [%s]" % predicate )
- rc = False
- for exp in predicate[1:]:
- rc = _doQuery( exp, params )
- if rc:
- break
- return rc
-
- elif opr == Query._LOGIC_NOT:
- logging.info("_doQuery() NOT: [%s]" % predicate )
- if len(predicate) != 2:
- logging.warning("Malformed query not-expression received: '%s'" % predicate)
- return False
- return not _doQuery( predicate[1:], params )
+# elif opr == Query._LOGIC_AND:
+# logging.debug("_doQuery() AND: [%s]" % predicate )
+# rc = False
+# for exp in predicate[1:]:
+# rc = _doQuery( exp, params )
+# if not rc:
+# break
+# return rc
- elif opr in [Query._CMP_EQ, Query._CMP_NE, Query._CMP_LT,
- Query._CMP_LE, Query._CMP_GT, Query._CMP_GE,
- Query._CMP_RE]:
- if len(predicate) != 3:
- logging.warning("Malformed query compare expression received: '%s'" % predicate)
- return False
- # @todo: support regular expression match
- name = predicate[1]
- if name not in params:
- logging.warning("Malformed query, attribute '%s' not present." % name)
- return False
- arg1 = params[name]
- arg1Type = type(arg1)
- logging.info("_doQuery() CMP: [%s] value='%s'" % (predicate, arg1) )
- try:
- arg2 = arg1Type(predicate[2])
- if opr == Query._CMP_EQ: return arg1 == arg2
- if opr == Query._CMP_NE: return arg1 != arg2
- if opr == Query._CMP_LT: return arg1 < arg2
- if opr == Query._CMP_LE: return arg1 <= arg2
- if opr == Query._CMP_GT: return arg1 > arg2
- if opr == Query._CMP_GE: return arg1 >= arg2
- if opr == Query._CMP_RE:
- logging.error("!!! RE QUERY TBD !!!")
- return False
- except:
- logging.warning("Malformed query, unable to compare '%s'" % predicate)
- return False
+# elif opr == Query._LOGIC_OR:
+# logging.debug("_doQuery() OR: [%s]" % predicate )
+# rc = False
+# for exp in predicate[1:]:
+# rc = _doQuery( exp, params )
+# if rc:
+# break
+# return rc
+
+# elif opr == Query._LOGIC_NOT:
+# logging.debug("_doQuery() NOT: [%s]" % predicate )
+# if len(predicate) != 2:
+# logging.warning("Malformed query not-expression received: '%s'" % predicate)
+# return False
+# return not _doQuery( predicate[1:], params )
- elif opr == Query._CMP_PRESENT:
- logging.info("_doQuery() PRESENT: [%s]" % predicate )
- if len(predicate) != 2:
- logging.warning("Malformed query present expression received: '%s'" % predicate)
- return False
- name = predicate[1]
- return name in params
- else:
- logging.warning("Unknown query operator received: '%s'" % opr)
- return False
+# else:
+# logging.warning("Unknown query operator received: '%s'" % opr)
+# return False
-class Query:
+
+class QmfQuery(object):
+
_TARGET="what"
_PREDICATE="where"
- _TARGET_PACKAGES="_packages"
- _TARGET_OBJECT_ID="_object_id"
- _TARGET_SCHEMA="_schema"
- _TARGET_SCHEMA_ID="_schema_id"
- _TARGET_MGT_DATA="_mgt_data"
- _TARGET_AGENT_ID="_agent_id"
+ #### Query Targets ####
+ _TARGET_PACKAGES="schema_package"
+ # (returns just package names)
+ # predicate key(s):
+ #
+ #_PRED_PACKAGE
+
+
+ _TARGET_SCHEMA_ID="schema_id"
+ _TARGET_SCHEMA="schema"
+ # predicate key(s):
+ #
+ #_PRED_PACKAGE
+ #_PRED_CLASS
+ #_PRED_TYPE
+ #_PRED_HASH
+ #_PRED_SCHEMA_ID
+ # name of property (exist test only)
+ # name of method (exist test only)
+
+
+ _TARGET_AGENT="agent"
+ # predicate keys(s):
+ #
+ #_PRED_VENDOR="_vendor"
+ #_PRED_PRODUCT="_product"
+ #_PRED_NAME="_name"
+
+ _TARGET_OBJECT_ID="object_id"
+ _TARGET_OBJECT="object"
+ # package and class names must be suppled in the target value:
+ # predicate on all values or meta-values[tbd]
+ #
+ #_PRED_PACKAGE
+ #_PRED_CLASS
+ #_PRED_TYPE
+ #_PRED_HASH
+ #_primary_key
+ #_PRED_SCHEMA_ID
+ #_PRED_OBJECT_ID
+ #_PRED_UPDATE_TS
+ #_PRED_CREATE_TS
+ #_PRED_DELETE_TS
+ #<name of property>
_PRED_PACKAGE="_package_name"
_PRED_CLASS="_class_name"
_PRED_TYPE="_type"
- _PRED_HASH="_has_str"
- _PRED_SCHEMA_ID="_schema_id"
+ _PRED_HASH="_hash_str"
_PRED_VENDOR="_vendor"
_PRED_PRODUCT="_product"
_PRED_NAME="_name"
- _PRED_AGENT_ID="_agent_id"
_PRED_PRIMARY_KEY="_primary_key"
+ _PRED_SCHEMA_ID="_schema_id"
+ _PRED_OBJECT_ID="_object_id"
+ _PRED_UPDATE_TS="_update_ts"
+ _PRED_CREATE_TS="_create_ts"
+ _PRED_DELETE_TS="_delete_ts"
_CMP_EQ="eq"
_CMP_NE="ne"
@@ -1110,8 +1162,8 @@ class Query:
_CMP_LE="le"
_CMP_GT="gt"
_CMP_GE="ge"
- _CMP_RE="re_match"
- _CMP_PRESENT="exists"
+ _CMP_RE_MATCH="re_match"
+ _CMP_EXISTS="exists"
_CMP_TRUE="true"
_CMP_FALSE="false"
@@ -1119,34 +1171,219 @@ class Query:
_LOGIC_OR="or"
_LOGIC_NOT="not"
- def __init__(self, kwargs={}):
- pass
-# if "impl" in kwargs:
-# self.impl = kwargs["impl"]
-# else:
-# package = ''
-# if "key" in kwargs:
-# # construct using SchemaClassKey:
-# self.impl = qmfengine.Query(kwargs["key"])
-# elif "object_id" in kwargs:
-# self.impl = qmfengine.Query(kwargs["object_id"].impl)
-# else:
-# if "package" in kwargs:
-# package = kwargs["package"]
-# if "class" in kwargs:
-# self.impl = qmfengine.Query(kwargs["class"], package)
-# else:
-# raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']")
+ _valid_targets = [_TARGET_PACKAGES, _TARGET_OBJECT_ID, _TARGET_SCHEMA, _TARGET_SCHEMA_ID,
+ _TARGET_OBJECT, _TARGET_AGENT]
+ def __init__(self, qmap):
+ """
+ """
+ self._target_map = None
+ self._predicate = None
+
+ if type(qmap) != dict:
+ raise TypeError("constructor must be of type dict")
+
+ if self._TARGET in qmap:
+ self._target_map = qmap[self._TARGET]
+ if self._PREDICATE in qmap:
+ self.setPredicate(qmap[self._PREDICATE])
+ return
+ else:
+ # assume qmap to be the target map
+ self._target_map = qmap[:]
+
+
+ def setPredicate(self, predicate):
+ """
+ """
+ if isinstance(predicate, QmfQueryPredicate):
+ self._predicate = predicate
+ elif type(predicate) == dict:
+ self._predicate = QmfQueryPredicate(predicate)
+ else:
+ raise TypeError("Invalid type for a predicate: %s" % str(predicate))
+
+
+ def evaluate(self, qmfData):
+ """
+ """
+ # @todo: how to interpred qmfData against target??????
+ #
+ if self._predicate:
+ return self._predicate.evaluate(qmfData)
+ # no predicate - always match
+ return True
+
+ def getTarget(self):
+ for key in self._target_map.iterkeys():
+ if key in self._valid_targets:
+ return key
+ return None
+
+ def getPredicate(self):
+ return self._predicate
+
+ def mapEncode(self):
+ _map = {}
+ _map[self._TARGET] = self._target_map
+ _map[self._PREDICATE] = self._predicate.mapEncode()
+ return _map
+
+ def __repr__(self):
+ return str(self.mapEncode())
+
+
+
+class QmfQueryPredicate(object):
+ """
+ Class for Query predicates.
+ """
+ _valid_cmp_ops = [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT,
+ QmfQuery._CMP_GT, QmfQuery._CMP_LE, QmfQuery._CMP_GE,
+ QmfQuery._CMP_EXISTS, QmfQuery._CMP_RE_MATCH,
+ QmfQuery._CMP_TRUE, QmfQuery._CMP_FALSE]
+ _valid_logic_ops = [QmfQuery._LOGIC_AND, QmfQuery._LOGIC_OR, QmfQuery._LOGIC_NOT]
+
+
+ def __init__( self, pmap):
+ """
+ {"op": listOf(operands)}
+ """
+ self._oper = None
+ self._operands = []
+
+ logic_op = False
+ if type(pmap) == dict:
+ for key in pmap.iterkeys():
+ logging.error("key = %s" % key)
+ if key in self._valid_cmp_ops:
+ # coparison operation - may have "name" and "value"
+ self._oper = key
+ break
+ if key in self._valid_logic_ops:
+ logic_op = True
+ self._oper = key
+ break
+
+ if not self._oper:
+ raise TypeError("invalid predicate expression: '%s'" % str(pmap))
+
+ if type(pmap[self._oper]) == list or type(pmap[self._oper]) == tuple:
+ if logic_op:
+ for exp in pmap[self._oper]:
+ self.append(QmfQueryPredicate(exp))
+ else:
+ self._operands = list(pmap[self._oper])
+
+ else:
+ raise TypeError("invalid predicate: '%s'" % str(pmap))
+
+
+ def append(self, operand):
+ """
+ Append another operand to a predicate expression
+ """
+ logging.error("Appending: '%s'" % str(operand))
+ self._operands.append(operand)
-# def package_name(self): return self.impl.getPackage()
-# def class_name(self): return self.impl.getClass()
-# def object_id(self):
-# _objid = self.impl.getObjectId()
-# if _objid:
-# return ObjectId(_objid)
-# else:
-# return None
+
+
+ def evaluate( self, qmfData ):
+ """
+ """
+ if not isinstance(qmfData, QmfData):
+ raise TypeError("Query expects to evaluate QmfData types.")
+
+ if self._oper == QmfQuery._CMP_TRUE:
+ logging.debug("query evaluate TRUE")
+ return True
+ if self._oper == QmfQuery._CMP_FALSE:
+ logging.debug("query evaluate FALSE")
+ return False
+
+ if self._oper in [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT,
+ QmfQuery._CMP_LE, QmfQuery._CMP_GT, QmfQuery._CMP_GE,
+ QmfQuery._CMP_RE_MATCH]:
+ if len(self._operands) != 2:
+ logging.warning("Malformed query compare expression received: '%s, %s'" %
+ (self._oper, str(self._operands)))
+ return False
+ # @todo: support regular expression match
+ name = self._operands[0]
+ logging.debug("looking for: '%s'" % str(name))
+ if not qmfData.hasProperty(name):
+ logging.warning("Malformed query, attribute '%s' not present." % name)
+ return False
+ arg1 = qmfData.getProperty(name)
+ arg1Type = type(arg1)
+ logging.debug("query evaluate %s: '%s' '%s' '%s'" %
+ (name, str(arg1), self._oper, str(self._operands[1])))
+ try:
+ arg2 = arg1Type(self._operands[1])
+ if self._oper == QmfQuery._CMP_EQ: return arg1 == arg2
+ if self._oper == QmfQuery._CMP_NE: return arg1 != arg2
+ if self._oper == QmfQuery._CMP_LT: return arg1 < arg2
+ if self._oper == QmfQuery._CMP_LE: return arg1 <= arg2
+ if self._oper == QmfQuery._CMP_GT: return arg1 > arg2
+ if self._oper == QmfQuery._CMP_GE: return arg1 >= arg2
+ if self._oper == QmfQuery._CMP_RE_MATCH:
+ logging.error("!!! RE QUERY TBD !!!")
+ return False
+ except:
+ pass
+ logging.warning("Malformed query - %s: '%s' '%s' '%s'" %
+ (name, str(arg1), self._oper, str(self._operands[1])))
+ return False
+
+
+ if self._oper == QmfQuery._CMP_EXISTS:
+ if len(self._operands) != 1:
+ logging.warning("Malformed query present expression received")
+ return False
+ name = self._operands[0]
+ logging.debug("query evaluate PRESENT: [%s]" % str(name))
+ return qmfData.hasProperty(name)
+
+ if self._oper == QmfQuery._LOGIC_AND:
+ logging.debug("query evaluate AND: '%s'" % str(self._operands))
+ for exp in self._operands:
+ if not exp.evaluate(qmfData):
+ return False
+ return True
+
+ if self._oper == QmfQuery._LOGIC_OR:
+ logging.debug("query evaluate OR: [%s]" % str(self._operands))
+ for exp in self._operands:
+ if exp.evaluate(qmfData):
+ return True
+ return False
+
+ if self._oper == QmfQuery._LOGIC_NOT:
+ logging.debug("query evaluate NOT: [%s]" % str(self._operands))
+ for exp in self._operands:
+ if exp.evaluate(qmfData):
+ return False
+ return True
+
+ logging.warning("Unrecognized query operator: [%s]" % str(self._oper))
+ return False
+
+
+ def mapEncode(self):
+ _map = {}
+ _list = []
+ for exp in self._operands:
+ if isinstance(exp, QmfQueryPredicate):
+ _list.append(exp.mapEncode())
+ else:
+ _list.append(exp)
+ _map[self._oper] = _list
+ return _map
+
+
+ def __repr__(self):
+ return str(self.mapEncode())
+
##==============================================================================
@@ -1696,7 +1933,7 @@ def SchemaMethodFactory( param ):
-class SchemaClass(object):
+class SchemaClass(QmfData):
"""
Base class for Data and Event Schema classes.
"""
@@ -1767,6 +2004,33 @@ class SchemaClass(object):
return hstr
+ def getPropertyCount(self): return len(self._properties)
+ def getProperties(self): return self._properties.copy()
+ def addProperty(self, name, prop):
+ self._properties[name] = prop
+ # need to re-generate schema hash
+ self._classId = None
+
+ def getProperty(self, name):
+ # check for meta-properties first
+ if name == QmfQuery._PRED_PACKAGE:
+ return self._pname
+ if name == QmfQuery._PRED_CLASS:
+ return self._cname
+ if name == QmfQuery._PRED_TYPE:
+ return self._type
+ if name == QmfQuery._PRED_HASH:
+ return self.getClassId().getHashString()
+ if name == QmfQuery._PRED_SCHEMA_ID:
+ return self.getClassId()
+ return super(SchemaClass, self).getProperty(name)
+
+ def hasProperty(self, name):
+ if name in [QmfQuery._PRED_PACKAGE, QmfQuery._PRED_CLASS,
+ QmfQuery._PRED_TYPE, QmfQuery._PRED_HASH, QmfQuery._PRED_SCHEMA_ID]:
+ return True
+ super(SchemaClass, self).hasProperty(name)
+
def mapEncode(self):
"""
Return the map encoding of this schema.
@@ -1838,19 +2102,9 @@ class SchemaObjectClass(SchemaClass):
def getPrimaryKeyList(self): return self._pkeyNames[:]
- def getPropertyCount(self): return len(self._properties)
- def getProperties(self): return self._properties.copy()
- def getProperty(self, name): return self._properties[name]
-
def getMethodCount(self): return len(self._methods)
def getMethods(self): return self._methods.copy()
def getMethod(self, name): return self._methods[name]
-
- def addProperty(self, name, prop):
- self._properties[name] = prop
- # need to re-generate schema hash
- self._classId = None
-
def addMethod(self, name, method):
self._methods[name] = method
self._classId = None
@@ -1916,18 +2170,6 @@ class SchemaEventClass(SchemaClass):
_hash )
self._properties = _props.copy()
- def getPropertyCount(self): return len(self._properties)
- def getProperties(self): return self._properties.copy()
- def getProperty(self, name): return self._properties[name]
- def addProperty(self, name, prop):
- self._properties[name] = prop
- # need to re-generate schema hash
- self._classId = None
-
- def mapEncode(self):
- _map = super(SchemaEventClass, self).mapEncode()
- return _map
-
def SchemaEventClassFactory( param ):
diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py
index e956b07dab..cc8c284579 100644
--- a/qpid/python/qmf/qmfConsole.py
+++ b/qpid/python/qmf/qmfConsole.py
@@ -21,6 +21,7 @@ import os
import logging
import platform
import time
+import datetime
import Queue
from threading import Thread
from threading import Lock
@@ -31,7 +32,8 @@ from qpid.messaging import *
from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION,
AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode,
- Query, AgentIdFactory, Notifier, _doQuery)
+ QmfQuery, AgentIdFactory, Notifier, QmfQueryPredicate, MsgKey,
+ QmfData)
@@ -61,6 +63,7 @@ class _Mailbox(object):
self._msgs.append(obj)
# if was empty, notify waiters
if len(self._msgs) == 1:
+ logging.error("Delivering @ %s" % time.time())
self._cv.notify()
finally:
self._cv.release()
@@ -114,6 +117,7 @@ class SequencedWaiter(object):
self.lock.acquire()
try:
if seq in self.pending:
+ logging.error("Putting seq %d @ %s" % (seq,time.time()))
self.pending[seq].deliver(new_data)
else:
logging.error( "seq %d not found!" % seq )
@@ -211,7 +215,7 @@ class ObjectProxy(object):
"""
if not self._agent:
raise Exception("No Agent associated with this object")
- newer = self._agent.get_object(Query({"object_id":None}), timeout)
+ newer = self._agent.get_object(QmfQuery({"object_id":None}), timeout)
if newer == None:
logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
@@ -247,10 +251,9 @@ class Agent(object):
self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)
self._console = console
self._sender = None
- self._packages = [] # list of package names known to this agent
- self._classes = {} # dict [key:class] of classes known to this agent
+ self._packages = {} # map of {package-name:[list of class-names], } for this agent
self._subscriptions = [] # list of active standing subscriptions for this agent
- self._announce_timestamp = long(0) # timestamp when last announce received
+ self._announce_timestamp = None # datetime when last announce received
logging.debug( "Created Agent with address: [%s]" % self._address )
@@ -258,31 +261,33 @@ class Agent(object):
return self._id
def isActive(self):
- return self._announce_timestamp != 0
+ return self._announce_timestamp != None
- def _send_msg(self, msg):
+ def _sendMsg(self, msg, correlation_id=None):
"""
Low-level routine to asynchronously send a message to this agent.
"""
msg.reply_to = self._console.address()
- handle = self._console._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
- msg.correlation_id = str(handle)
+ # handle = self._console._req_correlation.allocate()
+ # if handle == 0:
+ # raise Exception("Can not allocate a correlation id!")
+ # msg.correlation_id = str(handle)
+ if correlation_id:
+ msg.correlation_id = str(correlation_id)
self._sender.send(msg)
- return handle
+ # return handle
def get_packages(self):
"""
Return a list of the names of all packages known to this agent.
"""
- return self._packages[:]
+ return self._packages.keys()
def get_classes(self):
"""
Return a dictionary [key:class] of classes known to this agent.
"""
- return self._classes[:]
+ return self._packages.copy()
def get_objects(self, query, kwargs={}):
"""
@@ -329,6 +334,13 @@ class Agent(object):
def __str__(self):
return self.__repr__()
+ def _sendQuery(self, query, correlation_id=None):
+ """
+ """
+ msg = Message(subject=makeSubject(OpCode.get_query),
+ properties={"method":"request"},
+ content={MsgKey.query: query.mapEncode()})
+ self._sendMsg( msg, correlation_id )
##==============================================================================
@@ -377,7 +389,11 @@ class Console(Thread):
"""
A Console manages communications to a collection of agents on behalf of an application.
"""
- def __init__(self, name=None, notifier=None, kwargs={}):
+ def __init__(self, name=None, notifier=None,
+ reply_timeout = 60,
+ # agent_timeout = 120,
+ agent_timeout = 60,
+ kwargs={}):
"""
@type name: str
@param name: identifier for this console. Must be unique.
@@ -392,9 +408,9 @@ class Console(Thread):
self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name
self._notifier = notifier
+ self._lock = Lock()
self._conn = None
self._session = None
- self._lock = Lock()
# dict of "agent-direct-address":class Agent entries
self._agent_map = {}
self._direct_recvr = None
@@ -403,8 +419,10 @@ class Console(Thread):
self._schema_cache = {}
self._req_correlation = SequencedWaiter()
self._operational = False
- self._agent_discovery_predicate = None
- self._default_timeout = 60
+ self._agent_discovery_filter = None
+ self._reply_timeout = reply_timeout
+ self._agent_timeout = agent_timeout
+ self._next_agent_expire = None
# lock out run() thread
self._cv = Condition()
# for passing WorkItems to the application
@@ -431,12 +449,12 @@ class Console(Thread):
"""
logging.debug("Destroying Console...")
if self._conn:
- self.remove_connection(self._conn, timeout)
+ self.removeConnection(self._conn, timeout)
logging.debug("Console Destroyed")
- def add_connection(self, conn):
+ def addConnection(self, conn):
"""
Add a AMQP connection to the console. The console will setup a session over the
connection. The console will then broadcast an Agent Locate Indication over
@@ -449,8 +467,9 @@ class Console(Thread):
raise Exception( "Multiple connections per Console not supported." );
self._conn = conn
self._session = conn.session(name=self._name)
- self._direct_recvr = self._session.receiver(self._address)
- self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION)
+ self._direct_recvr = self._session.receiver(self._address, capacity=1)
+ self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION,
+ capacity=1)
self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE)
#
# Now that receivers are created, fire off the receive thread...
@@ -460,7 +479,7 @@ class Console(Thread):
- def remove_connection(self, conn, timeout=None):
+ def removeConnection(self, conn, timeout=None):
"""
Remove an AMQP connection from the console. Un-does the add_connection() operation,
and releases any agents and sessions associated with the connection.
@@ -489,40 +508,19 @@ class Console(Thread):
self._direct_recvr.close()
self._announce_recvr.close()
self._locate_sender.close()
+ self._session.close()
self._session = None
self._conn = None
logging.debug("console connection removal complete")
- def address(self):
+ def getAddress(self):
"""
The AMQP address this Console is listening to.
"""
return self._address
- def _createAgent( self, agent_id ):
- """
- Factory to create/retrieve an agent for this console
- """
- if not isinstance(agent_id, AgentId):
- raise TypeError("parameter must be an instance of class AgentId")
-
- self._lock.acquire()
- try:
- if agent_id in self._agent_map:
- return self._agent_map[agent_id]
-
- agent = Agent(agent_id, self)
- agent._sender = self._session.sender(agent._address)
- self._agent_map[agent_id] = agent
- finally:
- self._lock.release()
-
- return agent
-
-
-
def destroyAgent( self, agent ):
"""
Undoes create.
@@ -562,16 +560,18 @@ class Console(Thread):
raise Exception("Can not allocate a correlation id!")
try:
tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id))
+ query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
+ QmfQuery._PREDICATE:
+ {QmfQuery._LOGIC_AND:
+ [{QmfQuery._CMP_EQ: ["vendor", agent_id.vendor()]},
+ {QmfQuery._CMP_EQ: ["product", agent_id.product()]},
+ {QmfQuery._CMP_EQ: ["name", agent_id.name()]}]}})
msg = Message(subject=makeSubject(OpCode.agent_locate),
properties={"method":"request"},
- content={"query": {Query._TARGET: {Query._TARGET_AGENT_ID:None},
- Query._PREDICATE:
- [Query._LOGIC_AND,
- [Query._CMP_EQ, "vendor", agent_id.vendor()],
- [Query._CMP_EQ, "product", agent_id.product()],
- [Query._CMP_EQ, "name", agent_id.name()]]}})
+ content={MsgKey.query: query.mapEncode()})
msg.reply_to = self._address
msg.correlation_id = str(handle)
+ logging.debug("Sending Agent Locate (%s)" % time.time())
tmp_sender.send( msg )
except SendError, e:
logging.error(str(e))
@@ -579,11 +579,13 @@ class Console(Thread):
return None
if not timeout:
- timeout = self._default_timeout
+ timeout = self._reply_timeout
new_agent = None
+ logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
self._req_correlation.get_data( handle, timeout )
self._req_correlation.release(handle)
+ logging.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
if agent_id in self._agent_map:
@@ -605,25 +607,20 @@ class Console(Thread):
try:
msg = self._announce_recvr.fetch(timeout = 0)
if msg:
+ logging.error( "Announce Msg Rcvd@%s: [%s]" % (time.time(), msg) )
self._dispatch(msg, _direct=False)
- except:
+ except Empty:
pass
try:
msg = self._direct_recvr.fetch(timeout = 0)
if msg:
+ logging.error( "Direct Msg Rcvd@%s: [%s]" % (time.time(), msg) )
self._dispatch(msg, _direct=True)
- except:
+ except Empty:
pass
- # try:
- # logging.error("waiting for next rcvr...")
- # rcvr = self._session.next_receiver()
- # except:
- # logging.error("exception during next_receiver()")
-
- # logging.error("rcvr=%s" % str(rcvr))
-
+ self._expireAgents() # check for expired agents
if qLen == 0 and self._work_q.qsize() and self._notifier:
# work queue went non-empty, kick
@@ -634,10 +631,18 @@ class Console(Thread):
self._notifier.indication()
_callback_thread = None
- while self._operational and \
- self._announce_recvr.pending() == 0 and \
- self._direct_recvr.pending():
- time.sleep(0.5)
+ if self._operational:
+ # wait for a message to arrive or an agent
+ # to expire
+ now = datetime.datetime.utcnow()
+ if self._next_agent_expire > now:
+ timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
+ try:
+ logging.error("waiting for next rcvr (timeout=%s)..." % timeout)
+ self._session.next_receiver(timeout = timeout)
+ except Empty:
+ pass
+
logging.debug("Shutting down Console thread")
@@ -649,7 +654,6 @@ class Console(Thread):
"""
PRIVATE: Process a message received from an Agent
"""
- logging.error( "Message received from Agent! [%s]" % msg )
try:
version,opcode = parseSubject(msg.subject)
# @todo: deal with version mismatch!!!
@@ -688,13 +692,13 @@ class Console(Thread):
Process a received agent-ind message. This message may be a response to a
agent-locate, or it can be an unsolicited agent announce.
"""
- logging.debug("_handleAgentIndMsg '%s'" % msg)
+ logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time()))
- if Query._TARGET_AGENT_ID in cmap:
+ if MsgKey.agent_info in cmap:
try:
- agent_id = AgentIdFactory(cmap[Query._TARGET_AGENT_ID])
+ agent_id = AgentIdFactory(cmap[MsgKey.agent_info])
except:
- logging.debug("Bad agent-ind message received: '%s'" % msg)
+ logging.warning("Bad agent-ind message received: '%s'" % msg)
return
ignore = True
@@ -702,12 +706,10 @@ class Console(Thread):
correlated = False
if msg.correlation_id:
correlated = self._req_correlation.isValid(msg.correlation_id)
-
if direct and correlated:
ignore = False
- elif self._agent_discovery_predicate:
- matched = _doQuery( self._agent_discovery_predicate,
- agent_id.mapEncode() )
+ elif self._agent_discovery_filter:
+ matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode()))
ignore = not matched
if not ignore:
@@ -723,46 +725,109 @@ class Console(Thread):
# need to create and add a new agent
agent = self._createAgent(agent_id)
- old_timestamp = agent._announce_timestamp
- agent._announce_timestamp = time.time()
+ # lock out expiration scanning code
+ self._lock.acquire()
+ try:
+ old_timestamp = agent._announce_timestamp
+ agent._announce_timestamp = datetime.datetime.utcnow()
+ finally:
+ self._lock.release()
- if old_timestamp == 0 and matched:
- logging.debug("AGENT_ADDED for %s" % agent)
+ if old_timestamp == None and matched:
+ logging.error("AGENT_ADDED for %s (%s)" % (agent, time.time()))
wi = WorkItem(WorkItem.AGENT_ADDED,
{"agent": agent})
self._work_q.put(wi)
if correlated:
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ logging.error("waking waiters for correlation id %s" % msg.correlation_id)
self._req_correlation.put_data(msg.correlation_id, msg)
+
+ def _expireAgents(self):
+ """
+ Check for expired agents and issue notifications when they expire.
+ """
+ now = datetime.datetime.utcnow()
+ if self._next_agent_expire and now < self._next_agent_expire:
+ return
+ lifetime_delta = datetime.timedelta(seconds = self._agent_timeout)
+ next_expire_delta = lifetime_delta
+ self._lock.acquire()
+ try:
+ logging.debug("!!! expiring agents '%s'" % now)
+ for agent in self._agent_map.itervalues():
+ if agent._announce_timestamp:
+ agent_deathtime = agent._announce_timestamp + lifetime_delta
+ if agent_deathtime <= now:
+ logging.debug("AGENT_DELETED for %s" % agent)
+ agent._announce_timestamp = None
+ wi = WorkItem(WorkItem.AGENT_DELETED, {"agent":agent})
+ self._work_q.put(wi)
+ else:
+ if (agent_deathtime - now) < next_expire_delta:
+ next_expire_delta = agent_deathtime - now
+
+ self._next_agent_expire = now + next_expire_delta
+ logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
+ finally:
+ self._lock.release()
+
+
+
+ def _createAgent( self, agent_id ):
+ """
+ Factory to create/retrieve an agent for this console
+ """
+ if not isinstance(agent_id, AgentId):
+ raise TypeError("parameter must be an instance of class AgentId")
+
+ self._lock.acquire()
+ try:
+ if agent_id in self._agent_map:
+ return self._agent_map[agent_id]
+
+ agent = Agent(agent_id, self)
+ agent._sender = self._session.sender(agent._address)
+ self._agent_map[agent_id] = agent
+ finally:
+ self._lock.release()
+
+ # new agent - query for its schema database for
+ # seeding the schema cache (@todo)
+ # query = QmfQuery({QmfQuery._TARGET_SCHEMA_ID:None})
+ # agent._sendQuery( query )
+
+ return agent
+
+
+
def enableAgentDiscovery(self, query=None):
"""
Called to enable the asynchronous Agent Discovery process.
Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
"""
- self._agent_discovery_predicate = [Query._CMP_TRUE] # default: match all indications
if query:
- if not isinstance(query, dict):
- raise TypeError("parameter must be of type dict")
- if Query._TARGET not in query or query[Query._TARGET] != {Query._TARGET_AGENT_ID:None}:
- raise TypeError("query must be for an agent '%s'" % query)
- if Query._PREDICATE in query:
- self._agent_discovery_predicate = query[Query._PREDICATE][:]
-
+ if not isinstance(query, QmfQuery):
+ raise TypeError("Type QmfQuery expected")
+ self._agent_discovery_filter = query
+ else:
+ # create a match-all agent query (no predicate)
+ self._agent_discovery_filter = QmfQuery({QmfQuery._TARGET:
+ {QmfQuery._TARGET_AGENT:None}})
def disableAgentDiscovery(self):
"""
Called to disable the async Agent Discovery process enabled by
calling enableAgentDiscovery()
"""
- self._agent_discovery_predicate = None
+ self._agent_discovery_filter = None
- def get_workitem_count(self):
+ def getWorkItemCount(self):
"""
Returns the count of pending WorkItems that can be retrieved.
"""
@@ -770,7 +835,7 @@ class Console(Thread):
- def get_next_workitem(self, timeout=None):
+ def getNextWorkItem(self, timeout=None):
"""
Returns the next pending work item, or None if none available.
@todo: subclass and return an Empty event instead.
@@ -782,7 +847,7 @@ class Console(Thread):
return wi
- def release_workitem(self, wi):
+ def releaseWorkItem(self, wi):
"""
Return a WorkItem to the Console when it is no longer needed.
@todo: call Queue.task_done() - only 2.5+
@@ -793,7 +858,6 @@ class Console(Thread):
pass
-
# def get_packages(self):
# plist = []
# for i in range(self.impl.packageCount()):
@@ -1172,7 +1236,7 @@ class Console(Thread):
if __name__ == '__main__':
# temp test code
from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory,
- SchemaObjectClassFactory, ObjectIdFactory, QmfData, QmfDescribed,
+ SchemaObjectClassFactory, ObjectIdFactory, QmfDescribed,
QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
QmfEvent)
logging.getLogger().setLevel(logging.INFO)
@@ -1184,7 +1248,7 @@ if __name__ == '__main__':
logging.info( "Starting Console" )
_myConsole = Console()
- _myConsole.add_connection( _c )
+ _myConsole.addConnection( _c )
logging.info( "Finding Agent" )
_myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" ), 5 )
@@ -1192,7 +1256,7 @@ if __name__ == '__main__':
logging.info( "Agent Found: %s" % _myAgent )
logging.info( "Removing connection" )
- _myConsole.remove_connection( _c, 10 )
+ _myConsole.removeConnection( _c, 10 )
logging.info( "Destroying console:" )
_myConsole.destroy( 10 )
@@ -1211,7 +1275,7 @@ if __name__ == '__main__':
_noteMe = MyNotifier( 666 )
_myConsole = Console(notifier=_noteMe)
- _myConsole.add_connection( _c )
+ _myConsole.addConnection( _c )
_myConsole.enableAgentDiscovery()
logging.info("Waiting...")
@@ -1225,15 +1289,15 @@ if __name__ == '__main__':
break
- print("Work available = %d items!" % _myConsole.get_workitem_count())
- _wi = _myConsole.get_next_workitem(timeout=0)
+ print("Work available = %d items!" % _myConsole.getWorkItemCount())
+ _wi = _myConsole.getNextWorkItem(timeout=0)
while _wi:
print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
- _wi = _myConsole.get_next_workitem(timeout=0)
+ _wi = _myConsole.getNextWorkItem(timeout=0)
logging.info( "Removing connection" )
- _myConsole.remove_connection( _c, 10 )
+ _myConsole.removeConnection( _c, 10 )
logging.info( "Destroying console:" )
_myConsole.destroy( 10 )
@@ -1446,3 +1510,17 @@ if __name__ == '__main__':
print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.mapEncode())
+ logging.info( "******** Messing around with Queries ********" )
+
+ _q1 = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
+ QmfQuery._PREDICATE:
+ {QmfQuery._LOGIC_AND:
+ [{QmfQuery._CMP_EQ: ["vendor", "AVendor"]},
+ {QmfQuery._CMP_EQ: ["product", "SomeProduct"]},
+ {QmfQuery._CMP_EQ: ["name", "Thingy"]},
+ {QmfQuery._LOGIC_OR:
+ [{QmfQuery._CMP_LE: ["temperature", -10]},
+ {QmfQuery._CMP_FALSE: None},
+ {QmfQuery._CMP_EXISTS: ["namey"]}]}]}})
+
+ print("_q1.mapEncode() = [%s]" % _q1)
diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py
index cd1644bca2..d413358dd8 100644
--- a/qpid/python/qmf/test/agent_test.py
+++ b/qpid/python/qmf/test/agent_test.py
@@ -43,14 +43,23 @@ _schema.addProperty( "index1",
SchemaProperty(qmfTypes.TYPE_UINT8))
_schema.addProperty( "index2",
SchemaProperty(qmfTypes.TYPE_LSTR))
+# these two properties are statistics
+_schema.addProperty( "query_count",
+ SchemaProperty(qmfTypes.TYPE_UINT32))
+_schema.addProperty( "method_call_count",
+ SchemaProperty(qmfTypes.TYPE_UINT32))
+# These two properties can be set via the method call
+_schema.addProperty( "set_string",
+ SchemaProperty(qmfTypes.TYPE_LSTR))
+_schema.addProperty( "set_int",
+ SchemaProperty(qmfTypes.TYPE_UINT32))
-# add method
-_meth = SchemaMethod( _desc="A test method" )
-_meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) )
-_meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) )
-_meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) )
-_schema.addMethod( "meth_1", _meth )
+# add method
+_meth = SchemaMethod( _desc="Method to set string and int in object." )
+_meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+_meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+_schema.addMethod( "set_meth", _meth )
# Add schema to Agent
@@ -61,11 +70,19 @@ _agent.registerObjectClass(_schema)
_obj = QmfAgentData( _agent, _schema )
_obj.setProperty("index1", 100)
_obj.setProperty("index2", "a name" )
-
+_obj.setProperty("set_string", "UNSET")
+_obj.setProperty("set_int", 0)
+_obj.setProperty("query_count", 0)
+_obj.setProperty("method_call_count", 0)
_agent.addObject( _obj )
+
_agent.addObject( QmfAgentData( _agent, _schema,
_props={"index1":99,
- "index2": "another name"} ))
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
## Now connect to the broker
diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py
index 03b4a98808..e649b2b8e4 100644
--- a/qpid/python/qmf/test/console_test.py
+++ b/qpid/python/qmf/test/console_test.py
@@ -4,7 +4,7 @@ from threading import Semaphore
from qpid.messaging import *
-from qmfCommon import (Notifier, Query)
+from qmfCommon import (Notifier, QmfQuery)
from qmfConsole import Console
@@ -31,17 +31,19 @@ logging.info( "Starting Console" )
_notifier = ExampleNotifier()
_myConsole = Console(notifier=_notifier)
-_myConsole.add_connection( _c )
+_myConsole.addConnection( _c )
# Discover only agents from vendor "redhat.com" that
# are a "qmf" product....
# @todo: replace "manual" query construction with
# a formal class-based Query API
-_query = {Query._TARGET: {Query._TARGET_AGENT_ID:None},
- Query._PREDICATE:
- [Query._LOGIC_AND,
- [Query._CMP_EQ, "vendor", "redhat.com"],
- [Query._CMP_EQ, "product", "qmf"]]}
+_query = {QmfQuery._TARGET:
+ {QmfQuery._TARGET_AGENT:None},
+ QmfQuery._PREDICATE:
+ {QmfQuery._LOGIC_AND:
+ [{QmfQuery._CMP_EQ: ["vendor", "redhat.com"]},
+ {QmfQuery._CMP_EQ: ["product", "qmf"]}]}}
+_query = QmfQuery(_query)
_myConsole.enableAgentDiscovery(_query)
@@ -59,7 +61,7 @@ while not _done:
_done = True
logging.info( "Removing connection" )
-_myConsole.remove_connection( _c, 10 )
+_myConsole.removeConnection( _c, 10 )
logging.info( "Destroying console:" )
_myConsole.destroy( 10 )