diff options
authorTed Ross <>2010-01-11 20:46:07 +0000
committerTed Ross <>2010-01-11 20:46:07 +0000
commitc7e1aca1e90bacbf7ef579ffce686f187243b3c7 (patch)
parent1b4a13aa9781c3f72b2b53abfbce8528de67ad16 (diff)
QPID-2261 - Patch from Ken Giusti committed to the qmfv2 branch.
git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 1199 insertions, 1526 deletions
diff --git a/qpid/python/qmf/ b/qpid/python/qmf/
index 4b76263903..278eafedbc 100644
--- a/qpid/python/qmf/
+++ b/qpid/python/qmf/
@@ -24,11 +24,10 @@ import time
from threading import Thread, Lock
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
- AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject,
- parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey,
- QmfData)
+ makeSubject, parseSubject, OpCode, QmfQuery,
+ SchemaObjectClass, MsgKey, QmfData, QmfAddress,
+ SchemaClass)
@@ -36,21 +35,18 @@ from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE,
class Agent(Thread):
- def __init__(self, vendor, product, name=None,
- notifier=None, heartbeat_interval=30,
- kwargs={}):
+ def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
+ _max_msg_size=0, _capacity=10):
self._running = False
- self.vendor = vendor
- self.product = product
- if name:
- = name
- else:
- = uuid4().get_urn().split(":")[2]
- self._id = AgentId(self.vendor, self.product,
- self._address = str(self._id)
- self._notifier = notifier
- self._heartbeat_interval = heartbeat_interval
+ = str(name)
+ self._domain = _domain
+ self._notifier = _notifier
+ self._heartbeat_interval = _heartbeat_interval
+ self._max_msg_size = _max_msg_size
+ self._capacity = _capacity
self._conn = None
self._session = None
self._lock = Lock()
@@ -59,33 +55,48 @@ class Agent(Thread):
self._schema = {}
self._agent_data = {}
- def getAgentId(self):
- return AgentId(self.vendor, self.product,
+ def get_name(self):
+ return
def setConnection(self, conn):
+ my_addr =, self._domain)
+ locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+ ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+ logging.debug("my direct addr=%s" % my_addr)
+ logging.debug("agent.locate addr=%s" % locate_addr)
+ logging.debug("agent.ind addr=%s" % ind_addr)
self._conn = conn
self._session = self._conn.session()
- self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE, capacity=10)
- self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address,
- capacity=10)
- self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION)
+ self._direct_receiver = self._session.receiver(str(my_addr) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic, x-properties: {type:direct}}}",
+ capacity=self._capacity)
+ self._locate_receiver = self._session.receiver(str(locate_addr) +
+ ";{create:always, node-properties:{type:topic}}",
+ capacity=self._capacity)
+ self._ind_sender = self._session.sender(str(ind_addr) +
+ ";{create:always, node-properties:{type:topic}}")
self._running = True
- def registerObjectClass(self, schema):
+ def register_object_class(self, schema):
- Register an instance of a SchemaObjectClass with this agent
+ Register an instance of a SchemaClass with this agent
# @todo: need to update subscriptions
# @todo: need to mark schema as "non-const"
- if not isinstance(schema, SchemaObjectClass):
- raise TypeError("SchemaObjectClass instance expected")
+ if not isinstance(schema, SchemaClass):
+ raise TypeError("SchemaClass instance expected")
- classId = schema.getClassId()
- pname = classId.getPackageName()
- cname = classId.getClassName()
+ classId = schema.get_class_id()
+ pname = classId.get_package_name()
+ cname = classId.get_class_name()
if pname not in self._packages:
self._packages[pname] = [cname]
@@ -96,14 +107,13 @@ class Agent(Thread):
- def registerEventClass(self, cls):
- logging.error("!!!Agent.registerEventClass() TBD!!!")
+ def register_event_class(self, schema):
+ return self.register_object_class(schema)
def raiseEvent(self, qmfEvent):
logging.error("!!!Agent.raiseEvent() TBD!!!")
- def addObject(self, data ):
+ def add_object(self, data ):
Register an instance of a QmfAgentData object.
@@ -112,9 +122,13 @@ class Agent(Thread):
if not isinstance(data, QmfAgentData):
raise TypeError("QmfAgentData instance expected")
+ id_ = data.get_object_id()
+ if not id_:
+ raise TypeError("No identifier assigned to QmfAgentData!")
- self._agent_data[data.getObjectId()] = data
+ self._agent_data[id_] = data
@@ -147,37 +161,32 @@ class Agent(Thread):
while self._running:
now = datetime.datetime.utcnow()
- print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
+ # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
if now >= next_heartbeat:
logging.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds
- print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
+ # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
- logging.error("waiting for next rcvr (timeout=%s)..." % timeout)
- self._session.next_receiver(timeout = timeout)
+ self._session.next_receiver(timeout=timeout)
except Empty:
- pass
- except KeyboardInterrupt:
- break
+ continue
msg = self._locate_receiver.fetch(timeout = 0)
- if msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=False)
except Empty:
- pass
+ msg = None
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=False)
msg = self._direct_receiver.fetch(timeout = 0)
- if msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=True)
except Empty:
- pass
+ msg = None
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=True)
# Private:
@@ -187,8 +196,8 @@ class Agent(Thread):
Create an agent indication message identifying this agent
- _map = self.getAgentId().mapEncode()
- _map["schemaTimestamp"] = self._schema_timestamp
+ _map = {"_name": self.get_name(),
+ "_schema_timestamp": self._schema_timestamp}
return Message( subject=makeSubject(OpCode.agent_ind),
content={MsgKey.agent_info: _map})
@@ -241,9 +250,11 @@ class Agent(Thread):
reply = True
if "method" in props and props["method"] == "request":
- if MsgKey.query in cmap:
- agentIdMap = self.getAgentId().mapEncode()
- reply = QmfQuery(cmap[MsgKey.query]).evaluate(QmfData(agentIdMap))
+ query = cmap.get(MsgKey.query)
+ if query is not None:
+ # fake a QmfData containing my identifier for the query compare
+ tmpData = QmfData(_values={"_name": self.get_name()})
+ reply = QmfQuery(query).evaluate(tmpData)
if reply:
@@ -272,10 +283,9 @@ class Agent(Thread):
if target == QmfQuery._TARGET_PACKAGES:
self._queryPackages( msg, query )
elif target == QmfQuery._TARGET_SCHEMA_ID:
- self._querySchemaId( msg, query )
+ self._querySchema( msg, query, _idOnly=True )
elif target == QmfQuery._TARGET_SCHEMA:
- logging.warning("!!! Query TARGET=SCHEMA TBD !!!")
- #self._querySchema( query.getPredicate(), _idOnly=False )
+ self._querySchema( msg, query)
elif target == QmfQuery._TARGET_AGENT:
logging.warning("!!! Query TARGET=AGENT TBD !!!")
elif target == QmfQuery._TARGET_OBJECT_ID:
@@ -294,7 +304,7 @@ class Agent(Thread):
for name in self._packages.iterkeys():
- if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:name})):
+ if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})):
@@ -312,23 +322,32 @@ class Agent(Thread):
logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
- def _querySchemaId( self, msg, query ):
+ def _querySchema( self, msg, query, _idOnly=False ):
schemas = []
- for schemaId in self._schema.iterkeys():
- if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:schemaId.getPackageName()})):
- schemas.append(schemaId.mapEncode())
+ for sid,val in self._schema.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ schemas.append(sid.map_encode())
+ else:
+ schemas.append(val.map_encode())
tmp_snd = self._session.sender( msg.reply_to )
+ if _idOnly:
+ content = {MsgKey.schema_id: schemas}
+ else:
+ content = {MsgKey.schema:schemas}
m = Message( subject=makeSubject(OpCode.data_ind),
- content={MsgKey.schema_id: schemas} )
+ content=content )
if msg.correlation_id != None:
m.correlation_id = msg.correlation_id
@@ -340,35 +359,45 @@ class Agent(Thread):
-class QmfAgentData(QmfManaged):
+class QmfAgentData(QmfData):
A managed data object that is owned by an agent.
- def __init__(self, _agent, _schema, _props={}):
- """
- @type _agent: class Agent
- @param _agent: the agent that manages this object.
- @type _schema: class SchemaObjectClass
- @param _schema: the schema used to describe this data object
- @type _props: map of "name"=<value> pairs
- @param _props: initial values for all properties in this object
- """
- super(QmfAgentData, self).__init__(_agentId=_agent.getAgentId(),
- _schema=_schema,
- _props=_props)
+ def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
+ _tag=_tag, _ctime=ctime,
+ _utime=ctime, _object_id=_object_id,
+ _schema=_schema, _const=False)
+ self._agent = agent
def destroy(self):
- self._timestamps[QmfManaged._ts_delete] = long(time.time() * 1000)
+ self._dtime = long(time.time() * 1000)
# @todo: publish change
- def setProperty( self, _name, _value):
- super(QmfAgentData, self).setProperty(_name, _value)
+ def is_deleted(self):
+ return self._dtime == 0
+ def set_value(self, _name, _value, _subType=None):
+ super(QmfAgentData, self).set_value(_name, _value, _subType)
# @todo: publish change
+ def inc_value(self, name, delta):
+ """ add the delta to the property """
+ # @todo: need to take write-lock
+ logging.error(" TBD!!!")
+ def dec_value(self, name, delta):
+ """ subtract the delta from the property """
+ # @todo: need to take write-lock
+ logging.error(" TBD!!!")
@@ -377,22 +406,87 @@ class QmfAgentData(QmfManaged):
if __name__ == '__main__':
- import time
+ # static test cases - no message passing, just exercise API
+ from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes,
+ SchemaMethod, SchemaEventClass)
- "Starting Connection" )
- _c = Connection("localhost")
- _c.connect()
- #c.start()
- "Starting Agent" )
- _agent = Agent("", "agent", "tross")
- _agent.setConnection(_c)
+ "Create an Agent" )
+ _agent_name = AgentName("", "agent", "tross")
+ _agent = Agent(str(_agent_name))
- "Running Agent" )
- while True:
- try:
- time.sleep(10)
- except KeyboardInterrupt:
- break
+ "Get agent name: '%s'" % _agent.get_name())
+ "Create SchemaObjectClass" )
+ _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"])
+ # add properties
+ _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+ # these two properties are statistics
+ _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ # These two properties can be set via the method call
+ _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+ # add method
+ _meth = SchemaMethod(_desc="Method to set string and int in object." )
+ _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+ # Add schema to Agent
+ print("Schema Map='%s'" % str(_schema.map_encode()))
+ _agent.register_object_class(_schema)
+ # instantiate managed data objects matching the schema
+ "Create QmfAgentData" )
+ _obj = QmfAgentData( _agent, _schema=_schema )
+ _obj.set_value("index1", 100)
+ _obj.set_value("index2", "a name" )
+ _obj.set_value("set_string", "UNSET")
+ _obj.set_value("set_int", 0)
+ _obj.set_value("query_count", 0)
+ _obj.set_value("method_call_count", 0)
+ print("Obj1 Map='%s'" % str(_obj.map_encode()))
+ _agent.add_object( _obj )
+ _obj = QmfAgentData( _agent,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0},
+ _schema=_schema)
+ print("Obj2 Map='%s'" % str(_obj.map_encode()))
+ _agent.add_object(_obj)
+ ##############
+ "Create SchemaEventClass" )
+ _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A test data schema",
+ _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)})
+ _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR))
+ print("Event Map='%s'" % str(_event.map_encode()))
+ _agent.register_event_class(_event)
diff --git a/qpid/python/qmf/ b/qpid/python/qmf/
index 6e79b1c3f4..9bcb166c6b 100644
--- a/qpid/python/qmf/
+++ b/qpid/python/qmf/
@@ -34,11 +34,10 @@ except ImportError:
## Constants
-AMQP_QMF_TOPIC = "amq.topic"
-AMQP_QMF_AGENT_LOCATE = "amq.topic/agent.locate"
-AMQP_QMF_AGENT_INDICATION = "amq.topic/agent.ind"
+AMQP_QMF_AGENT_LOCATE = "agent.locate"
@@ -49,6 +48,7 @@ class MsgKey(object):
query = "query"
package_info = "package_info"
schema_id = "schema_id"
+ schema = "schema"
class OpCode(object):
@@ -108,26 +108,71 @@ class Notifier(object):
-## Agent Identification
+## Addressing
-class AgentId(object):
+class QmfAddress(object):
- Uniquely identifies a management agent within the entire management domain.
- Map format:
- map["vendor"] = str, name of vendor of the agent
- map["product"] = str, name of product using agent
- map["name"] = str, name of agent, unique within vendor and product.
+ """
+ TYPE_DIRECT = "direct"
+ TYPE_TOPIC = "topic"
+ ADDRESS_FMT = "qmf.%s.%s/%s"
+ DEFAULT_DOMAIN = "default"
+ def __init__(self, name, domain, type_):
+ self._name = name
+ self._domain = domain
+ self._type = type_
+ def _direct(cls, name, _domain=None):
+ if _domain is None:
+ _domain = QmfAddress.DEFAULT_DOMAIN
+ return cls(name, _domain, type_=QmfAddress.TYPE_DIRECT)
+ direct = classmethod(_direct)
+ def _topic(cls, name, _domain=None):
+ if _domain is None:
+ _domain = QmfAddress.DEFAULT_DOMAIN
+ return cls(name, _domain, type_=QmfAddress.TYPE_TOPIC)
+ topic = classmethod(_topic)
+ def get_address(self):
+ return str(self)
+ def __repr__(self):
+ return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._name)
+class AgentName(object):
+ """
+ Uniquely identifies a management agent within the management domain.
_separator = ":"
- def __init__(self, vendor, product, name):
+ def __init__(self, vendor, product, name, str_=None):
Note: this object must be immutable, as it is used to index into a dictionary
- self._vendor = vendor
- self._product = product
- self._name = name
+ if str_:
+ # construct from string representation
+ if _str.count(AgentId._separator) < 2:
+ raise TypeError("AgentId string format must be ''")
+ self._vendor, self._product, self._name = param.split(AgentId._separator)
+ else:
+ self._vendor = vendor
+ self._product = product
+ self._name = name
+ def _from_str(cls, str_):
+ return cls(None, None, None, str_=str_)
+ from_str = classmethod(_from_str)
def vendor(self):
return self._vendor
@@ -138,15 +183,8 @@ class AgentId(object):
def name(self):
return self._name
- def mapEncode(self):
- _map = {}
- _map["vendor"] = self._vendor
- _map["product"] = self._product
- _map["name"] = self._name
- return _map
def __cmp__(self, other):
- if not isinstance(other, AgentId) :
+ if not isinstance(other, AgentName) :
raise TypeError("Invalid types for compare")
# return 1
me = str(self)
@@ -162,40 +200,10 @@ class AgentId(object):
return (self._vendor, self._product, self._name).__hash__()
def __repr__(self):
- return self._vendor + AgentId._separator + \
- self._product + AgentId._separator + \
+ return self._vendor + AgentName._separator + \
+ self._product + AgentName._separator + \
- def __str__(self):
- return self.__repr__()
-def AgentIdFactory( param ):
- """
- Factory for constructing an AgentId class from various sources.
- @type param: various
- @param param: object to use for constructing AgentId
- @rtype: AgentId
- @returns: a new AgentId instance
- """
- if type(param) == str:
- if param.count(AgentId._separator) < 2:
- raise TypeError("AgentId string format must be 'vendor:product:name'")
- vendor, product, name = param.split(AgentId._separator)
- return AgentId(vendor, product, name)
- if type(param) == dict:
- # construct from map encoding
- if not "vendor" in param:
- raise TypeError("requires 'vendor' value")
- if not "product" in param:
- raise TypeError("requires 'product' value")
- if not "name" in param:
- raise TypeError("requires 'name' value")
- return AgentId( param["vendor"], param["product"], param["name"] )
- raise TypeError("Invalid type for AgentId construction")
@@ -203,451 +211,293 @@ def AgentIdFactory( param ):
-class ObjectId(object):
- """
- An instance of managed object must be uniquely identified within the
- management system. Each managed object is given a key that is unique
- within the domain of the object's managing Agent. Note that these
- keys are not unique across Agents. Therefore, a globally unique name
- for an instance of a managed object is the concatenation of the
- object's key and the managing Agent's AgentId.
- Map format:
- map["agent_id"] = map representation of AgentId
- map["primary_key"] = str, key for managed object
- """
- def __init__(self, agentid, name):
- if not isinstance(agentid, AgentId):
- raise TypeError("requires an AgentId class")
- self._agentId = agentid;
- self._name = name;
- def getAgentId(self):
- """
- @rtype: class AgentId
- @returns: Id of agent that manages the object.
- """
- return self._agentId
- def getPrimaryKey(self):
- """
- @rtype: str
- @returns: Key of managed object.
- """
- return self._name
- def mapEncode(self):
- _map = {}
- _map["agent_id"] = self._agentId.mapEncode()
- _map["primary_key"] = self._name
- return _map
- def __repr__(self):
- return "%s:%s" % (self._agentId, self._name)
- def __cmp__(self, other):
- if not isinstance(other, ObjectId) :
- raise TypeError("Invalid types for compare")
- if self._agentId < other._agentId:
- return -1
- if self._agentId > other._agentId:
- return 1
- if self._name < other._name:
- return -1
- if self._name > other._name:
- return 1
- return 0
- def __hash__(self):
- return (hash(self._agentId), self._name).__hash__()
-def ObjectIdFactory( param ):
- """
- Factory for constructing ObjectIds from various sources
- @type param: various
- @param param: object to use for constructing ObjectId
- @rtype: ObjectId
- @returns: a new ObjectId instance
+class _mapEncoder(object):
+ """
+ virtual base class for all objects that support being converted to a map
- if type(param) == dict:
- # construct from map
- if "agent_id" not in param:
- raise TypeError("requires 'agent_id' value")
- if "primary_key" not in param:
- raise TypeError("requires 'primary_key' value")
- return ObjectId( AgentIdFactory(param["agent_id"]), param["primary_key"] )
- else:
- raise TypeError("Invalid type for ObjectId construction")
+ def map_encode(self):
+ raise Exception("The map_encode method my be overridden.")
-class QmfData(object):
+class QmfData(_mapEncoder):
Base data class representing arbitrarily structure data. No schema or
managing agent is associated with data of this class.
Map format:
- map["properties"] = map of unordered "name"=<value> pairs (optional)
+ map["_values"] = map of unordered "name"=<value> pairs (optional)
+ map["_subtype"] = map of unordered "name"="subtype string" pairs (optional)
+ map["_tag"] = application-specific tag for this instance (optional)
- def __init__(self, _props={}, _const=False):
- """
- @type _props: dict
- @param _props: dictionary of initial name=value pairs for object's property data.
+ KEY_VALUES = "_values"
+ KEY_SUBTYPES = "_subtypes"
+ KEY_TAG="_tag"
+ KEY_OBJECT_ID = "_object_id"
+ KEY_SCHEMA_ID = "_schema_id"
+ KEY_UPDATE_TS = "_update_ts"
+ KEY_CREATE_TS = "_create_ts"
+ KEY_DELETE_TS = "_delete_ts"
+ def __init__(self,
+ _values={}, _subtypes={}, _tag=None, _object_id=None,
+ _ctime = 0, _utime = 0, _dtime = 0,
+ _map=None,
+ _schema=None, _const=False):
+ """
+ @type _values: dict
+ @param _values: dictionary of initial name=value pairs for object's
+ named data.
+ @type _subtypes: dict
+ @param _subtype: dictionary of subtype strings for each of the object's
+ named data.
+ @type _desc: string
+ @param _desc: Human-readable description of this data object.
@type _const: boolean
@param _const: if true, this object cannot be modified
- self._properties = _props.copy()
+ self._schema_id = None
+ if _map is not None:
+ # construct from map
+ _tag = _map.get(self.KEY_TAG, _tag)
+ _values = _map.get(self.KEY_VALUES, _values)
+ _subtypes = _map.get(self.KEY_SUBTYPES, _subtypes)
+ _object_id = _map.get(self.KEY_OBJECT_ID, _object_id)
+ sid = _map.get(self.KEY_SCHEMA_ID)
+ if sid:
+ self._schema_id = SchemaClassId(_map=sid)
+ _ctime = long(_map.get(self.KEY_CREATE_TS, _ctime))
+ _utime = long(_map.get(self.KEY_UPDATE_TS, _utime))
+ _dtime = long(_map.get(self.KEY_DELETE_TS, _dtime))
+ self._values = _values.copy()
+ self._subtypes = _subtypes.copy()
+ self._tag = _tag
+ self._ctime = _ctime
+ self._utime = _utime
+ self._dtime = _dtime
self._const = _const
- self._managed = False # not managed by an Agent
- self._described = False # not described by a schema
- def isManaged(self):
- return self._managed
- def isDescribed(self):
- return self._described
- def getProperties(self):
- return self._properties.copy()
+ if _object_id is not None:
+ self._object_id = str(_object_id)
+ else:
+ self._object_id = None
- def getProperty(self, _name):
- return self._properties[_name]
+ if _schema is not None:
+ self._set_schema(_schema)
+ else:
+ # careful: map constructor may have already set self._schema_id, do
+ # not override it!
+ self._schema = None
+ def _create(cls, values, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None, _const=False):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ return cls(_values=values, _subtypes=_subtypes, _tag=_tag,
+ _ctime=ctime, _utime=ctime,
+ _object_id=_object_id, _schema=_schema, _const=_const)
+ create = classmethod(_create)
+ def _from_map(cls, map_, _schema=None, _const=False):
+ return cls(_map=map_, _schema=_schema, _const=_const)
+ from_map = classmethod(_from_map)
+ def is_managed(self):
+ return self._object_id is not None
+ def is_described(self):
+ return self._schema_id is not None
+ def get_tag(self):
+ return self._tag
+ def get_value(self, name):
+ # meta-properties:
+ if name == SchemaClassId.KEY_PACKAGE:
+ if self._schema_id:
+ return self._schema_id.get_package_name()
+ return None
+ if name == SchemaClassId.KEY_CLASS:
+ if self._schema_id:
+ return self._schema_id.get_class_name()
+ return None
+ if name == SchemaClassId.KEY_TYPE:
+ if self._schema_id:
+ return self._schema_id.get_type()
+ return None
+ if name == SchemaClassId.KEY_HASH:
+ if self._schema_id:
+ return self._schema_id.get_hash_string()
+ return None
+ if name == self.KEY_SCHEMA_ID:
+ return self._schema_id
+ if name == self.KEY_OBJECT_ID:
+ return self._object_id
+ if name == self.KEY_TAG:
+ return self._tag
+ if name == self.KEY_UPDATE_TS:
+ return self._utime
+ if name == self.KEY_CREATE_TS:
+ return self._ctime
+ if name == self.KEY_DELETE_TS:
+ return self._dtime
+ return self._values.get(name)
+ def has_value(self, name):
+ if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS,
+ SchemaClassId.KEY_TYPE, SchemaClassId.KEY_HASH,
+ self.KEY_SCHEMA_ID]:
+ return self._schema_id is not None
+ if name in [self.KEY_UPDATE_TS, self.KEY_CREATE_TS,
+ self.KEY_DELETE_TS]:
+ return True
+ if name == self.KEY_OBJECT_ID:
+ return self._object_id is not None
+ if name == self.KEY_TAG:
+ return self._tag is not None
- def hasProperty(self, _name):
- return _name in self._properties
+ return name in self._values
- def setProperty(self, _name, _value):
+ def set_value(self, _name, _value, _subType=None):
if self._const:
raise Exception("cannot modify constant data object")
- self._properties[_name] = _value
+ self._values[_name] = _value
+ if _subType:
+ self._subtypes[_name] = _subType
return _value
- def mapEncode(self):
- return self._properties.copy()
- def __repr__(self):
- return str(self.mapEncode())
- def __setattr__(self, _name, _value):
- # ignore private data members
- if _name[0] == '_':
- return super.__setattr__(self, _name, _value)
- if _name in self._properties:
- return self.setProperty(_name, _value)
- return super.__setattr__(self, _name, _value)
- def __getattr__(self, _name):
- if _name in self._properties: return self.getProperty(_name)
- raise AttributeError("no item named '%s' in this object" % _name)
- def __getitem__(self, _name):
- return self.__getattr__(_name)
- def __setitem__(self, _name, _value):
- return self.__setattr__(_name, _value)
-def QmfDataFactory( param, const=False ):
- """
- Factory for constructing an QmfData class from various sources.
- @type param: various
- @param param: object to use for constructing QmfData instance
- @rtype: QmfData
- @returns: a new QmfData instance
- """
- if type(param) == dict:
- # construct from map
- return QmfData( _props=param, _const=const )
- else:
- raise TypeError("Invalid type for QmfData construction")
+ def get_subtype(self, _name):
+ return self._subtypes.get(_name)
-class QmfDescribed(QmfData):
- """
- Data that has a formally defined structure is represented by the
- QmfDescribed class. This class extends the QmfData class by
- associating the data with a formal schema (SchemaObjectClass).
- Map format:
- map["schema_id"] = map representation of a SchemaClassId instance
- map["properties"] = map representation of a QmfData instance
- """
- def __init__(self, _schema=None, _schemaId=None, _props={}, _const=False ):
- """
- @type _schema: class SchemaClass or derivative
- @param _schema: an instance of the schema used to describe this object.
- @type _schemaId: class SchemaClassId
- @param _schemaId: if schema instance not available, this is mandatory.
- @type _props: dict
- @param _props: dictionary of initial name=value pairs for object's property data.
- @type _const: boolean
- @param _const: if true, this object cannot be modified
- """
- super(QmfDescribed, self).__init__(_props, _const)
- self._validated = False
- self._described = True
- self._schema = _schema
- if _schema:
- self._schemaId = _schema.getClassId()
- if self._const:
- self._validate()
- else:
- if _schemaId:
- self._schemaId = _schemaId
- else:
- raise Exception("A SchemaClass or SchemaClassId must be provided")
- def getSchemaClassId(self):
+ def get_schema_class_id(self):
@rtype: class SchemaClassId
@returns: the identifier of the Schema that describes the structure of the data.
- return self._schemaId
- def setSchema(self, _schema):
- """
- @type _schema: class SchemaClass or derivative
- @param _schema: instance of schema used to describe the structure of the data.
- """
- if self._schemaId != _schema.getClassId():
- raise Exception("Cannot reset current schema to a different schema")
- oldSchema = self._schema
- self._schema = _schema
- if not oldSchema and self._const:
- self._validate()
+ return self._schema_id
- def getPrimaryKey(self):
+ def get_object_id(self):
- Get a string composed of the object's primary key properties.
+ Get the instance's identification string.
@rtype: str
- @returns: a string composed from primary key property values.
+ @returns: the identification string, or None if not assigned and id.
+ if self._object_id:
+ return self._object_id
+ # if object id not assigned, see if schema defines a set of field
+ # values to use as an id
if not self._schema:
- raise Exception("schema not available")
+ return None
+ ids = self._schema.get_id_names()
+ if not ids:
+ return None
if not self._validated:
- if self._schema._pkeyNames == 0:
- if len(self._properties) != 1:
- raise Exception("no primary key defined")
- return str(self._properties.values()[0])
result = u""
- for pkey in self._schema._pkeyNames:
- if result != u"":
- result += u":"
+ for key in ids:
- valstr = unicode(self._properties[pkey])
+ result += unicode(self._values[key])
- valstr = u"<undecodable>"
- result += valstr
+ logging.error("get_object_id(): cannot convert value '%s'."
+ % key)
+ return None
+ self._object_id = result
return result
- def getProperty(self, name):
- # meta-properties
- if name == QmfQuery._PRED_PACKAGE:
- return self._schemaId.getClassId().getPackageName()
- if name == QmfQuery._PRED_CLASS:
- return self._schemaId.getClassId().getClassName()
- if name == QmfQuery._PRED_TYPE:
- return self._schemaId.getClassId().getType()
- if name == QmfQuery._PRED_HASH:
- return self._schemaId.getClassId().getHashString()
- if name == QmfQuery._PRED_SCHEMA_ID:
- return self._schemaId.getClassId()
- if name == QmfQuery._PRED_PRIMARY_KEY:
- return self.getPrimaryKey()
- return super(QmfDescribed, self).getProperty(name)
- def hasProperty(self, name):
- if name in [QmfQuery._PRED_PACKAGE, QmfQuery._PRED_CLASS, QmfQuery._PRED_TYPE,
- return True
- return super(QmfDescribed, self).hasProperty(name)
- def mapEncode(self):
+ def map_encode(self):
_map = {}
- _map["schema_id"] = self._schemaId.mapEncode()
- _map["properties"] = super(QmfDescribed, self).mapEncode()
+ if self._tag:
+ _map[self.KEY_TAG] = self._tag
+ # data in the _values map may require recursive map_encode()
+ vmap = {}
+ for name,val in self._values.iteritems():
+ if isinstance(val, _mapEncoder):
+ vmap[name] = val.map_encode()
+ else:
+ # otherwise, just toss in the native type...
+ vmap[name] = val
+ _map[self.KEY_VALUES] = vmap
+ # subtypes are never complex, so safe to just copy
+ _map[self.KEY_SUBTYPES] = self._subtypes.copy()
+ if self._object_id:
+ _map[self.KEY_OBJECT_ID] = self._object_id
+ if self._schema_id:
+ _map[self.KEY_SCHEMA_ID] = self._schema_id.map_encode()
return _map
+ def _set_schema(self, schema):
+ self._validated = False
+ self._schema = schema
+ if schema:
+ self._schema_id = schema.get_class_id()
+ if self._const:
+ self._validate()
+ else:
+ self._schema_id = None
def _validate(self):
Compares this object's data against the associated schema. Throws an
exception if the data does not conform to the schema.
- for name,val in self._schema._properties.iteritems():
+ props = self._schema.get_properties()
+ for name,val in props.iteritems():
# @todo validate: type compatible with amqp_type?
# @todo validate: primary keys have values
- if name not in self._properties:
+ if name not in self._values:
if val._isOptional:
# ok not to be present, put in dummy value
# to simplify access
- self._properties[name] = None
+ self._values[name] = None
raise Exception("Required property '%s' not present." % name)
self._validated = True
+ def __repr__(self):
+ return "QmfData=<<" + str(self.map_encode()) + ">>"
+ def __setattr__(self, _name, _value):
+ # ignore private data members
+ if _name[0] == '_':
+ return super(QmfData, self).__setattr__(_name, _value)
+ if _name in self._values:
+ return self.set_value(_name, _value)
+ return super(QmfData, self).__setattr__(_name, _value)
-def QmfDescribedFactory( param, _schema=None ):
- """
- Factory for constructing an QmfDescribed class from various sources.
- @type param: various
- @param param: object to use for constructing QmfDescribed instance
- @type _schema: SchemaClass
- @param _schema: instance of the SchemaClass that describes this instance
- @rtype: QmfDescribed
- @returns: a new QmfDescribed instance
- """
- if type(param) == dict:
- # construct from map
- if "schema_id" not in param:
- raise TypeError("requires 'schema_id' value")
- if "properties" not in param:
- raise TypeError("requires 'properties' value")
- return QmfDescribed( _schema=_schema, _schemaId=SchemaClassIdFactory( param["schema_id"] ),
- _props = param["properties"] )
- else:
- raise TypeError("Invalid type for QmfDescribed construction")
-class QmfManaged(QmfDescribed):
- """
- Data that has a formally defined structure, and for which each
- instance of the data is managed by a particular Agent is represented
- by the QmfManaged class. This class extends the QmfDescribed class by
- associating the described data with a particular Agent.
- Map format:
- map["object_id"] = map representation of an ObjectId value
- map["schema_id"] = map representation of a SchemaClassId instance
- map["qmf_data"] = map representation of a QmfData instance
- map["timestamps"] = array of AMQP timestamps. [0] = time of last update,
- [1] = creation timestamp, [2] = deletion timestamp or zero.
- """
- _ts_update = 0
- _ts_create = 1
- _ts_delete = 2
- def __init__( self, _agentId=None, _schema=None, _schemaId=None,
- _props={}, _const=False ):
- """
- @type _agentId: class AgentId
- @param _agentId: globally unique identifier of the managing agent.
- @type _schema: class SchemaClass or derivative
- @param _schema: an instance of the schema used to describe this object.
- @type _schemaId: class SchemaClassId
- @param _schemaId: if schema instance not available, this is mandatory.
- @type _props: dict
- @param _props: dictionary of initial name=value pairs for object's property data.
- @type _const: boolean
- @param _const: if true, this object cannot be modified
- """
- super(QmfManaged, self).__init__(_schema=_schema, _schemaId=_schemaId,
- _props=_props, _const=_const)
- self._managed = True
- self._agentId = _agentId
- # timestamp, in millisec since epoch UTC
- _ctime = long(time.time() * 1000)
- self._timestamps = [_ctime,_ctime,0]
- def getObjectId(self):
- """
- @rtype: class ObjectId
- @returns: the ObjectId that uniquely identifies this managed object.
- """
- return ObjectId(self._agentId, self.getPrimaryKey())
- def isDeleted(self):
- return self._timestamps[QmfManaged._ts_delete] == 0
- def getProperty(self, name):
- # meta-properties
- if name == QmfQuery._PRED_OBJECT_ID:
- return self.getObjectId()
- if name == QmfQuery._PRED_UPDATE_TS:
- return self._timestamps[QmfManaged._ts_update]
- if name == QmfQuery._PRED_CREATE_TS:
- return self._timestamps[QmfManaged._ts_create]
- if name == QmfQuery._PRED_DELETE_TS:
- return self._timestamps[QmfManaged._ts_delete]
- return super(QmfManaged, self).getProperty(name)
- def hasProperty(self, name):
- if name in [QmfQuery._PRED_OBJECT_ID, QmfQuery._PRED_UPDATE_TS,
- return True
- return super(QmfManaged, self).hasProperty(name)
- def mapEncode(self):
- _map = super(QmfManaged, self).mapEncode()
- _map["agent_id"] = self._agentId.mapEncode()
- _map["timestamps"] = self._timestamps[:]
- return _map
+ def __getattr__(self, _name):
+ if _name != "_values" and _name in self._values:
+ return self._values[_name]
+ raise AttributeError("no value named '%s' in this object" % _name)
+ def __getitem__(self, _name):
+ return self.__getattr__(_name)
-def QmfManagedFactory( param, _schema=None ):
- """
- Factory for constructing an QmfManaged instance from various sources.
- @type param: various
- @param param: object to use for constructing QmfManaged instance
- @type _schema: SchemaClass
- @param _schema: instance of the SchemaClass that describes this instance
- @rtype: QmfManaged
- @returns: a new QmfManaged instance
- """
- if type(param) == dict:
- # construct from map
- if "agent_id" not in param:
- raise TypeError("requires 'agent_id' value")
- _qd = QmfDescribedFactory( param, _schema )
- return QmfManaged( _agentId = AgentIdFactory(param["agent_id"]),
- _schema=_schema, _schemaId=_qd._schemaId,
- _props = _qd._properties )
- else:
- raise TypeError("Invalid type for QmfManaged construction")
+ def __setitem__(self, _name, _value):
+ return self.__setattr__(_name, _value)
-class QmfEvent(QmfDescribed):
+class QmfEvent(QmfData):
A QMF Event is a type of described data that is not managed. Events are
notifications that are sent by Agents. An event notifies a Console of a
change in some aspect of the system under managment.
- def __init__(self, _map=None,
- _timestamp=None, _agentId=None,
- _schema=None, _schemaId=None,
- _props={}, _const=False):
+ KEY_TIMESTAMP = "_timestamp"
+ def __init__(self, _timestamp=None, _values={}, _subtypes={}, _tag=None,
+ _map=None,
+ _schema=None, _const=True):
@type _map: dict
@param _map: if not None, construct instance from map representation.
@@ -661,240 +511,52 @@ class QmfEvent(QmfDescribed):
@type _schemaId: class SchemaClassId (event)
@param _schemaId: identi
- if _map:
- if type(_map) != dict:
- raise TypeError("parameter '_map' must be of type 'dict'")
- if "timestamp" not in _map:
- pass
- if "agent_id" not in _map:
- pass
- _qe = QmfDescribedFactory( _map, _schema )
- super(QmfEvent, self).__init__( _schema=_qe._schema, _schemaId=_qe._schemaId,
- _props=_qe._properties, _const=_qe._const )
- self._timestamp = long(_map["timestamp"])
- self._agentId = AgentIdFactory(_map["agent_id"])
+ if _map is not None:
+ # construct from map
+ super(QmfEvent, self).__init__(_map=_map, _schema=_schema,
+ _const=_const)
+ _timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp)
- super(QmfEvent, self).__init__(_schema=_schema, _schemaId=_schemaId,
- _props=_props, _const=_const)
+ super(QmfEvent, self).__init__(_values=_values,
+ _subtypes=_subtypes, _tag=_tag,
+ _schema=_schema, _const=_const)
+ if _timestamp is None:
+ raise TypeError("QmfEvent: a valid timestamp is required.")
+ try:
self._timestamp = long(_timestamp)
- self._agentId = _agentId
+ except:
+ raise TypeError("QmfEvent: a numeric timestamp is required.")
+ def _create(cls, timestamp, values,
+ _subtypes={}, _tag=None, _schema=None, _const=False):
+ return cls(_timestamp=timestamp, _values=values, _subtypes=_subtypes,
+ _tag=_tag, _schema=_schema, _const=_const)
+ create = classmethod(_create)
- def getTimestamp(self): return self._timestamp
+ def _from_map(cls, map_, _schema=None, _const=False):
+ return cls(_map=map_, _schema=_schema, _const=_const)
+ from_map = classmethod(_from_map)
- def getAgentId(self): return self._agentId
+ def get_timestamp(self):
+ return self._timestamp
- def mapEncode(self):
- _map = super(QmfEvent, self).mapEncode()
- _map["timestamp"] = self._timestamp
- _map["agent_id"] = self._agentId.mapEncode()
+ def map_encode(self):
+ _map = super(QmfEvent, self).map_encode()
+ _map[self.KEY_TIMESTAMP] = self._timestamp
return _map
-class QmfObject(object):
- # attr_reader :impl, :object_class
- def __init__(self, cls, kwargs={}):
- pass
-# self._cv = Condition()
-# self._sync_count = 0
-# self._sync_result = None
-# self._allow_sets = False
-# if kwargs.has_key("broker"):
-# self._broker = kwargs["broker"]
-# else:
-# self._broker = None
-# if cls:
-# self.object_class = cls
-# self.impl = qmfengine.Object(self.object_class.impl)
-# elif kwargs.has_key("impl"):
-# self.impl = qmfengine.Object(kwargs["impl"])
-# self.object_class = SchemaObjectClass(None,
-# None,
-# {"impl":self.impl.getClass()})
-# else:
-# raise Exception("Argument error: required parameter ('impl') not supplied")
-# def destroy(self):
-# self.impl.destroy()
-# def object_id(self):
-# return ObjectId(self.impl.getObjectId())
-# def set_object_id(self, oid):
-# self.impl.setObjectId(oid.impl)
-# def properties(self):
-# list = []
-# for prop in
-# list.append([prop, self.get_attr(])
-# return list
-# def statistics(self):
-# list = []
-# for stat in self.object_class.statistics:
-# list.append([stat, self.get_attr(])
-# return list
-# def get_attr(self, name):
-# val = self._value(name)
-# vType = val.getType()
-# if vType == TYPE_UINT8: return val.asUint()
-# elif vType == TYPE_UINT16: return val.asUint()
-# elif vType == TYPE_UINT32: return val.asUint()
-# elif vType == TYPE_UINT64: return val.asUint64()
-# elif vType == TYPE_SSTR: return val.asString()
-# elif vType == TYPE_LSTR: return val.asString()
-# elif vType == TYPE_ABSTIME: return val.asInt64()
-# elif vType == TYPE_DELTATIME: return val.asUint64()
-# elif vType == TYPE_REF: return ObjectId(val.asObjectId())
-# elif vType == TYPE_BOOL: return val.asBool()
-# elif vType == TYPE_FLOAT: return val.asFloat()
-# elif vType == TYPE_DOUBLE: return val.asDouble()
-# elif vType == TYPE_UUID: return val.asUuid()
-# elif vType == TYPE_INT8: return val.asInt()
-# elif vType == TYPE_INT16: return val.asInt()
-# elif vType == TYPE_INT32: return val.asInt()
-# elif vType == TYPE_INT64: return val.asInt64()
-# else:
-# # when TYPE_MAP
-# # when TYPE_OBJECT
-# # when TYPE_LIST
-# # when TYPE_ARRAY
-# logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) )
-# return None
-# def set_attr(self, name, v):
-# val = self._value(name)
-# vType = val.getType()
-# if vType == TYPE_UINT8: return val.setUint(v)
-# elif vType == TYPE_UINT16: return val.setUint(v)
-# elif vType == TYPE_UINT32: return val.setUint(v)
-# elif vType == TYPE_UINT64: return val.setUint64(v)
-# elif vType == TYPE_SSTR:
-# if v: return val.setString(v)
-# else: return val.setString('')
-# elif vType == TYPE_LSTR:
-# if v: return val.setString(v)
-# else: return val.setString('')
-# elif vType == TYPE_ABSTIME: return val.setInt64(v)
-# elif vType == TYPE_DELTATIME: return val.setUint64(v)
-# elif vType == TYPE_REF: return val.setObjectId(v.impl)
-# elif vType == TYPE_BOOL: return val.setBool(v)
-# elif vType == TYPE_FLOAT: return val.setFloat(v)
-# elif vType == TYPE_DOUBLE: return val.setDouble(v)
-# elif vType == TYPE_UUID: return val.setUuid(v)
-# elif vType == TYPE_INT8: return val.setInt(v)
-# elif vType == TYPE_INT16: return val.setInt(v)
-# elif vType == TYPE_INT32: return val.setInt(v)
-# elif vType == TYPE_INT64: return val.setInt64(v)
-# else:
-# # when TYPE_MAP
-# # when TYPE_OBJECT
-# # when TYPE_LIST
-# # when TYPE_ARRAY
-# logging.error("Unsupported type for get_attr? '%s'" % str(val.getType()))
-# return None
-# def __getitem__(self, name):
-# return self.get_attr(name)
-# def __setitem__(self, name, value):
-# self.set_attr(name, value)
-# def inc_attr(self, name, by=1):
-# self.set_attr(name, self.get_attr(name) + by)
-# def dec_attr(self, name, by=1):
-# self.set_attr(name, self.get_attr(name) - by)
-# def _invokeMethod(self, name, argMap):
-# """
-# Private: Helper function that invokes an object's method, and waits for the result.
-# """
-# self._cv.acquire()
-# try:
-# timeout = 30
-# self._sync_count = 1
-# self.impl.invokeMethod(name, argMap, self)
-# if self._broker:
-# self._broker.conn.kick()
-# self._cv.wait(timeout)
-# if self._sync_count == 1:
-# raise Exception("Timed out: waiting for response to method call.")
-# finally:
-# self._cv.release()
-# return self._sync_result
-# def _method_result(self, result):
-# """
-# Called to return the result of a method call on an object
-# """
-# self._cv.acquire();
-# try:
-# self._sync_result = result
-# self._sync_count -= 1
-# self._cv.notify()
-# finally:
-# self._cv.release()
-# def _marshall(schema, args):
-# '''
-# Private: Convert a list of arguments (positional) into a Value object of type "map".
-# Used to create the argument parameter for an object's method invokation.
-# '''
-# # Build a map of the method's arguments
-# map = qmfengine.Value(TYPE_MAP)
-# for arg in schema.arguments:
-# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
-# map.insert(, qmfengine.Value(arg.typecode))
-# # install each argument's value into the map
-# marshalled = Arguments(map)
-# idx = 0
-# for arg in schema.arguments:
-# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
-# if args[idx]:
-# marshalled[] = args[idx]
-# idx += 1
-# return
-# def _value(self, name):
-# val = self.impl.getValue(name)
-# if not val:
-# raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" %
-# (name,
-# self.object_class.impl.getClassKey().getPackageName(),
-# self.object_class.impl.getClassKey().getClassName()))
-# return val
class Arguments(object):
def __init__(self, map):
@@ -1092,7 +754,7 @@ class MethodResponse(object):
-class QmfQuery(object):
+class QmfQuery(_mapEncoder):
@@ -1223,18 +885,19 @@ class QmfQuery(object):
def getPredicate(self):
return self._predicate
- def mapEncode(self):
+ def map_encode(self):
_map = {}
_map[self._TARGET] = self._target_map
- _map[self._PREDICATE] = self._predicate.mapEncode()
+ if self._predicate is not None:
+ _map[self._PREDICATE] = self._predicate.map_encode()
return _map
def __repr__(self):
- return str(self.mapEncode())
+ return "QmfQuery=<<" + str(self.map_encode()) + ">>"
-class QmfQueryPredicate(object):
+class QmfQueryPredicate(_mapEncoder):
Class for Query predicates.
@@ -1255,7 +918,6 @@ class QmfQueryPredicate(object):
logic_op = False
if type(pmap) == dict:
for key in pmap.iterkeys():
- logging.error("key = %s" % key)
if key in self._valid_cmp_ops:
# coparison operation - may have "name" and "value"
self._oper = key
@@ -1283,7 +945,6 @@ class QmfQueryPredicate(object):
Append another operand to a predicate expression
- logging.error("Appending: '%s'" % str(operand))
@@ -1311,15 +972,16 @@ class QmfQueryPredicate(object):
# @todo: support regular expression match
name = self._operands[0]
logging.debug("looking for: '%s'" % str(name))
- if not qmfData.hasProperty(name):
- logging.warning("Malformed query, attribute '%s' not present." % name)
+ if not qmfData.has_value(name):
+ logging.warning("Malformed query, attribute '%s' not present."
+ % name)
return False
- arg1 = qmfData.getProperty(name)
- arg1Type = type(arg1)
+ arg1 = qmfData.get_value(name)
+ arg2 = self._operands[1]
logging.debug("query evaluate %s: '%s' '%s' '%s'" %
- (name, str(arg1), self._oper, str(self._operands[1])))
+ (name, str(arg1), self._oper, str(arg2)))
- arg2 = arg1Type(self._operands[1])
if self._oper == QmfQuery._CMP_EQ: return arg1 == arg2
if self._oper == QmfQuery._CMP_NE: return arg1 != arg2
if self._oper == QmfQuery._CMP_LT: return arg1 < arg2
@@ -1342,7 +1004,7 @@ class QmfQueryPredicate(object):
return False
name = self._operands[0]
logging.debug("query evaluate PRESENT: [%s]" % str(name))
- return qmfData.hasProperty(name)
+ return qmfData.has_value(name)
if self._oper == QmfQuery._LOGIC_AND:
logging.debug("query evaluate AND: '%s'" % str(self._operands))
@@ -1369,12 +1031,12 @@ class QmfQueryPredicate(object):
return False
- def mapEncode(self):
+ def map_encode(self):
_map = {}
_list = []
for exp in self._operands:
if isinstance(exp, QmfQueryPredicate):
- _list.append(exp.mapEncode())
+ _list.append(exp.map_encode())
_map[self._oper] = _list
@@ -1382,7 +1044,7 @@ class QmfQueryPredicate(object):
def __repr__(self):
- return str(self.mapEncode())
+ return "QmfQueryPredicate=<<" + str(self.map_encode()) + ">>"
@@ -1390,15 +1052,6 @@ class QmfQueryPredicate(object):
-# known schema types
-SchemaTypeData = "data"
-SchemaTypeEvent = "event"
-# format convention for schema hash
-_schemaHashStrFormat = "%08x-%08x-%08x-%08x"
-_schemaHashStrDefault = "00000000-00000000-00000000-00000000"
# Argument typecodes, access, and direction qualifiers
@@ -1458,7 +1111,7 @@ def _toBool( param ):
-class SchemaClassId(object):
+class SchemaClassId(_mapEncoder):
Unique identifier for an instance of a SchemaClass.
@@ -1470,20 +1123,40 @@ class SchemaClassId(object):
map["hash_str"] = str, hash value in standard format or None
if hash is unknown.
- def __init__(self, pname, cname, stype=SchemaTypeData, hstr=None):
+ KEY_PACKAGE="_package_name"
+ KEY_CLASS="_class_name"
+ KEY_TYPE="_type"
+ KEY_HASH="_hash_str"
+ TYPE_DATA = "_data"
+ TYPE_EVENT = "event"
+ _valid_types=[TYPE_DATA, TYPE_EVENT]
+ _schemaHashStrFormat = "%08x-%08x-%08x-%08x"
+ _schemaHashStrDefault = "00000000-00000000-00000000-00000000"
+ def __init__(self, pname=None, cname=None, stype=TYPE_DATA, hstr=None,
+ _map=None):
@type pname: str
@param pname: the name of the class's package
@type cname: str
@param cname: name of the class
@type stype: str
- @param stype: schema type [SchemaTypeData | SchemaTypeEvent]
+ @param stype: schema type [data | event]
@type hstr: str
@param hstr: the hash value in '%08x-%08x-%08x-%08x' format
+ if _map is not None:
+ # construct from map
+ pname = _map.get(self.KEY_PACKAGE, pname)
+ cname = _map.get(self.KEY_CLASS, cname)
+ stype = _map.get(self.KEY_TYPE, stype)
+ hstr = _map.get(self.KEY_HASH, hstr)
self._pname = pname
self._cname = cname
- if stype != SchemaTypeData and stype != SchemaTypeEvent:
+ if stype not in SchemaClassId._valid_types:
raise TypeError("Invalid SchemaClassId type: '%s'" % stype)
self._type = stype
self._hstr = hstr
@@ -1498,8 +1171,17 @@ class SchemaClassId(object):
raise Exception("Invalid SchemaClassId format: bad hash string: '%s':"
% hstr)
+ # constructor
+ def _create(cls, pname, cname, stype=TYPE_DATA, hstr=None):
+ return cls(pname=pname, cname=cname, stype=stype, hstr=hstr)
+ create = classmethod(_create)
- def getPackageName(self):
+ # map constructor
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
+ def get_package_name(self):
Access the package name in the SchemaClassId.
@@ -1508,7 +1190,7 @@ class SchemaClassId(object):
return self._pname
- def getClassName(self):
+ def get_class_name(self):
Access the class name in the SchemaClassId
@@ -1517,7 +1199,7 @@ class SchemaClassId(object):
return self._cname
- def getHashString(self):
+ def get_hash_string(self):
Access the schema's hash as a string value
@@ -1526,7 +1208,7 @@ class SchemaClassId(object):
return self._hstr
- def getType(self):
+ def get_type(self):
Returns the type code associated with this Schema
@@ -1534,27 +1216,25 @@ class SchemaClassId(object):
return self._type
- def mapEncode(self):
+ def map_encode(self):
_map = {}
- _map["package_name"] = self._pname
- _map["class_name"] = self._cname
- _map["type"] = self._type
- if self._hstr: _map["hash_str"] = self._hstr
+ _map[self.KEY_PACKAGE] = self._pname
+ _map[self.KEY_CLASS] = self._cname
+ _map[self.KEY_TYPE] = self._type
+ if self._hstr: _map[self.KEY_HASH] = self._hstr
return _map
def __repr__(self):
- if self._type == SchemaTypeEvent:
- stype = "event"
- else:
- stype = "data"
- hstr = self.getHashString()
+ hstr = self.get_hash_string()
if not hstr:
- hstr = _schemaHashStrDefault
- return self._pname + ":" + self._cname + ":" + stype + "(" + hstr + ")"
+ hstr = SchemaClassId._schemaHashStrDefault
+ return self._pname + ":" + self._cname + ":" + self._type + "(" + hstr + ")"
def __cmp__(self, other):
- if not isinstance(other, SchemaClassId) :
+ if isinstance(other, dict):
+ other = SchemaClassId.from_map(other)
+ if not isinstance(other, SchemaClassId):
raise TypeError("Invalid types for compare")
# return 1
me = str(self)
@@ -1571,55 +1251,7 @@ class SchemaClassId(object):
-def SchemaClassIdFactory( param ):
- """
- Factory for constructing SchemaClassIds from various sources
- @type param: various
- @param param: object to use for constructing SchemaClassId
- @rtype: SchemaClassId
- @returns: a new SchemaClassId instance
- """
- if type(param) == str:
- # construct from __repr__/__str__ representation
- try:
- pname, cname, rest = param.split(":")
- stype,rest = rest.split("(");
- hstr = rest.rstrip(")");
- if hstr == _schemaHashStrDefault:
- hstr = None
- return SchemaClassId( pname, cname, stype, hstr )
- except:
- raise TypeError("Invalid string format: '%s'" % param)
- if type(param) == dict:
- # construct from map representation
- if "package_name" in param:
- pname = param["package_name"]
- else:
- raise TypeError("'package_name' attribute is required")
- if "class_name" in param:
- cname = param["class_name"]
- else:
- raise TypeError("'class_name' attribute is required")
- hstr = None
- if "hash_str" in param:
- hstr = param["hash_str"]
- stype = "data"
- if "type" in param:
- stype = param["type"]
- return SchemaClassId( pname, cname, stype, hstr )
- else:
- raise TypeError("Invalid type for SchemaClassId construction")
-class SchemaProperty(object):
+class SchemaProperty(_mapEncoder):
Describes the structure of a Property data object.
Map format:
@@ -1641,8 +1273,17 @@ class SchemaProperty(object):
__hash__ = None
_access_strings = ["RO","RW","RC"]
- def __init__(self, typeCode, kwargs={}):
- self._type = typeCode
+ _dir_strings = ["I", "O", "IO"]
+ def __init__(self, _type_code=None, _map=None, kwargs={}):
+ if _map is not None:
+ # construct from map
+ _type_code = _map.get("amqp_type", _type_code)
+ kwargs = _map
+ if not _type_code:
+ raise TypeError("SchemaProperty: amqp_type is a mandatory"
+ " parameter")
+ self._type = _type_code
self._access = "RO"
self._isIndex = False
self._isOptional = False
@@ -1653,6 +1294,8 @@ class SchemaProperty(object):
self._desc = None
self._reference = None
self._isParentRef = False
+ self._dir = None
+ self._default = None
for key, value in kwargs.items():
if key == "access":
@@ -1669,6 +1312,22 @@ class SchemaProperty(object):
elif key == "desc" : self._desc = value
elif key == "reference" : self._reference = value
elif key == "parent_ref" : self._isParentRef = _toBool(value)
+ elif key == "dir":
+ value = str(value).upper()
+ if value not in self._dir_strings:
+ raise TypeError("invalid value for direction parameter: '%s'" % value)
+ self._dir = value
+ elif key == "default" : self._default = value
+ # constructor
+ def _create(cls, type_code, kwargs={}):
+ return cls(_type_code=type_code, kwargs=kwargs)
+ create = classmethod(_create)
+ # map constructor
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
def getType(self): return self._type
@@ -1692,7 +1351,11 @@ class SchemaProperty(object):
def isParentRef(self): return self._isParentRef
- def mapEncode(self):
+ def getDirection(self): return self._dir
+ def getDefault(self): return self._default
+ def map_encode(self):
Return the map encoding of this schema.
@@ -1708,9 +1371,12 @@ class SchemaProperty(object):
if self._desc: _map["desc"] = self._desc
if self._reference: _map["reference"] = self._reference
_map["parent_ref"] = self._isParentRef
+ if self._dir: _map["dir"] = self._dir
+ if self._default: _map["default"] = self._default
return _map
- def __repr__(self): return str(self.mapEncode())
+ def __repr__(self):
+ return "SchemaProperty=<<" + str(self.map_encode()) + ">>"
def _updateHash(self, hasher):
@@ -1722,122 +1388,21 @@ class SchemaProperty(object):
if self._access: hasher.update(self._access)
if self._unit: hasher.update(self._unit)
if self._desc: hasher.update(self._desc)
+ if self._dir: hasher.update(self._dir)
+ if self._default: hasher.update(self._default)
-def SchemaPropertyFactory( _map ):
- """
- Factory for constructing SchemaProperty from a map
- @type _map: dict
- @param _map: from mapEncode() of SchemaProperty
- @rtype: SchemaProperty
- @returns: a new SchemaProperty instance
- """
- if type(_map) == dict:
- # construct from map
- if "amqp_type" not in _map:
- raise TypeError("requires 'amqp_type' value")
- return SchemaProperty( _map["amqp_type"], _map )
- else:
- raise TypeError("Invalid type for SchemaProperty construction")
-class SchemaArgument(object):
- """
- Describes the structure of an Argument to a Method call.
- Map format:
- map["amqp_type"] = int, type code indicating argument's data type
- map["dir"] = str, direction for an argument associated with a
- Method, "I"|"O"|"IO", default value: "I"
- optional:
- map["desc"] = str, human-readable description of this argument
- map["default"] = by amqp_type, default value to use if none provided
- """
- __hash__ = None
- _dir_strings = ["I", "O", "IO"]
- def __init__(self, typeCode, kwargs={}):
- self._type = typeCode
- self._dir = "I"
- self._desc = None
- self._default = None
- for key, value in kwargs.items():
- if key == "dir":
- value = str(value).upper()
- if value not in self._dir_strings:
- raise TypeError("invalid value for dir parameter: '%s'" % value)
- self._dir = value
- elif key == "desc" : self._desc = value
- elif key == "default" : self._default = value
- def getType(self): return self._type
- def getDirection(self): return self._dir
- def getDesc(self): return self._desc
- def getDefault(self): return self._default
- def mapEncode(self):
- """
- Return the map encoding of this schema.
- """
- _map = {}
- _map["amqp_type"] = self._type
- _map["dir"] = self._dir
- # optional:
- if self._default: _map["default"] = self._default
- if self._desc: _map["desc"] = self._desc
- return _map
- def __repr__(self): return str(self.mapEncode())
- def _updateHash(self, hasher):
- """
- Update the given hash object with a hash computed over this schema.
- """
- hasher.update(str(self._type))
- hasher.update(self._dir)
- if self._desc: hasher.update(self._desc)
-def SchemaArgumentFactory( param ):
- """
- Factory for constructing SchemaArguments from various sources
- @type param: various
- @param param: object to use for constructing SchemaArgument
- @rtype: SchemaArgument
- @returns: a new SchemaArgument instance
- """
- if type(param) == dict:
- # construct from map
- if not "amqp_type" in param:
- raise TypeError("requires 'amqp_type' value")
- return SchemaArgument( param["amqp_type"], param )
- else:
- raise TypeError("Invalid type for SchemaArgument construction")
-class SchemaMethod(object):
+class SchemaMethod(_mapEncoder):
The SchemaMethod class describes the method's structure, and contains a
SchemaProperty class for each argument declared by the method.
Map format:
- map["arguments"] = map of "name"=<SchemaArgument> pairs.
+ map["arguments"] = map of "name"=<SchemaProperty> pairs.
map["desc"] = str, description of the method
- def __init__(self, args={}, _desc=None):
+ def __init__(self, args={}, _desc=None, _map=None):
Construct a SchemaMethod.
@@ -1846,9 +1411,22 @@ class SchemaMethod(object):
@type _desc: str
@param _desc: Human-readable description of the schema
+ if _map is not None:
+ _desc = _map.get("desc")
+ margs = _map.get("arguments", args)
+ # margs are in map format - covert to SchemaProperty
+ args = {}
+ for name,val in margs.iteritems():
+ args[name] = SchemaProperty.from_map(val)
self._arguments = args.copy()
self._desc = _desc
+ # map constructor
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
def getDesc(self): return self._desc
def getArgCount(self): return len(self._arguments)
@@ -1857,7 +1435,7 @@ class SchemaMethod(object):
def getArgument(self, name): return self._arguments[name]
- def addArgument(self, name, schema):
+ def add_argument(self, name, schema):
Add an argument to the list of arguments passed to this method.
Used by an agent for dynamically creating method schema.
@@ -1871,29 +1449,28 @@ class SchemaMethod(object):
raise TypeError("argument must be a SchemaProperty class")
self._arguments[name] = schema
- def mapEncode(self):
+ def map_encode(self):
Return the map encoding of this schema.
_map = {}
_args = {}
for name,val in self._arguments.iteritems():
- _args[name] = val.mapEncode()
+ _args[name] = val.map_encode()
_map["arguments"] = _args
if self._desc: _map["desc"] = self._desc
return _map
def __repr__(self):
- result = "("
+ result = "SchemaMethod=<<args=("
first = True
for name,arg in self._arguments.iteritems():
- if arg._dir.find("I") != -1:
- if first:
- first = False
- else:
- result += ", "
- result += name
- result += ")"
+ if first:
+ first = False
+ else:
+ result += ", "
+ result += name
+ result += ")>>"
return result
def _updateHash(self, hasher):
@@ -1907,89 +1484,77 @@ class SchemaMethod(object):
-def SchemaMethodFactory( param ):
+class SchemaClass(QmfData):
- Factory for constructing a SchemaMethod from various sources
+ Base class for Data and Event Schema classes.
- @type param: various
- @param param: object to use for constructing a SchemaMethod
- @rtype: SchemaMethod
- @returns: a new SchemaMethod instance
+ Map format:
+ map(QmfData), plus:
+ map["_schema_id"] = map representation of a SchemaClassId instance
+ map["_primary_key_names"] = order list of primary key names
- if type(param) == dict:
- # construct from map
- args = {}
- desc = None
- if "arguments" in param:
- for name,val in param["arguments"].iteritems():
- args[name] = SchemaArgumentFactory(val)
- if "desc" in param:
- desc = param["desc"]
- return SchemaMethod( args, desc )
- else:
- raise TypeError("Invalid type for SchemaMethod construction")
+ KEY_SCHEMA_ID="_schema_id"
+ KEY_PRIMARY_KEY_NAMES="_primary_key_names"
+ KEY_DESC = "_desc"
+ SUBTYPE_PROPERTY="qmfProperty"
+ SUBTYPE_METHOD="qmfMethod"
-class SchemaClass(QmfData):
- """
- Base class for Data and Event Schema classes.
- """
- def __init__(self, pname, cname, stype, desc=None, hstr=None):
+ def __init__(self, _classId=None, _desc=None, _map=None):
Schema Class constructor.
- @type pname: str
- @param pname: package name
- @type cname: str
- @param cname: class name
- @type stype: str
- @param stype: type of schema, either "data" or "event"
- @type desc: str
- @param desc: Human-readable description of the schema
- @type hstr: str
- @param hstr: hash computed over the schema body, else None
+ @type classId: class SchemaClassId
+ @param classId: Identifier for this SchemaClass
+ @type _desc: str
+ @param _desc: Human-readable description of the schema
- self._pname = pname
- self._cname = cname
- self._type = stype
- self._desc = desc
- if hstr:
- self._classId = SchemaClassId( pname, cname, stype, hstr )
+ if _map is not None:
+ super(SchemaClass, self).__init__(_map=_map)
+ # decode each value based on its type
+ for name,value in self._values.iteritems():
+ if self._subtypes.get(name) == self.SUBTYPE_METHOD:
+ self._values[name] = SchemaMethod.from_map(value)
+ else:
+ self._values[name] = SchemaProperty.from_map(value)
+ cid = _map.get(self.KEY_SCHEMA_ID)
+ if cid:
+ _classId = SchemaClassId.from_map(cid)
+ self._object_id_names = _map.get(self.KEY_PRIMARY_KEY_NAMES,[])
+ _desc = _map.get(self.KEY_DESC)
- self._classId = None
- self._properties = {}
- self._methods = {}
- self._pkeyNames = []
+ super(SchemaClass, self).__init__()
+ self._object_id_names = []
+ self._classId = _classId
+ self._desc = _desc
- def getClassId(self):
- if not self._classId:
- self.generateHash()
+ def get_class_id(self):
+ if not self._classId.get_hash_string():
+ self.generate_hash()
return self._classId
- def getDesc(self): return self._desc
+ def get_desc(self): return self._desc
- def generateHash(self):
+ def generate_hash(self):
generate an md5 hash over the body of the schema,
and return a string representation of the hash
in format "%08x-%08x-%08x-%08x"
md5Hash = _md5Obj()
- md5Hash.update(self._pname)
- md5Hash.update(self._cname)
- md5Hash.update(self._type)
- for name,x in self._properties.iteritems():
+ md5Hash.update(self._classId.get_package_name())
+ md5Hash.update(self._classId.get_class_name())
+ md5Hash.update(self._classId.get_type())
+ for name,x in self._values.iteritems():
x._updateHash( md5Hash )
- for name,x in self._methods.iteritems():
+ for name,value in self._subtypes.iteritems():
- x._updateHash( md5Hash )
+ md5Hash.update(value)
idx = 0
- for name in self._pkeyNames:
+ for name in self._object_id_names:
md5Hash.update(str(idx) + name)
idx += 1
hstr = md5Hash.hexdigest()[0:8] + "-" +\
@@ -1997,65 +1562,70 @@ class SchemaClass(QmfData):
md5Hash.hexdigest()[16:24] + "-" +\
# update classId with new hash value
- self._classId = SchemaClassId( self._pname,
- self._cname,
- self._type,
- hstr )
+ self._classId._hstr = hstr
return hstr
- def getPropertyCount(self): return len(self._properties)
- def getProperties(self): return self._properties.copy()
- def addProperty(self, name, prop):
- self._properties[name] = prop
+ def get_property_count(self):
+ count = 0
+ for value in self._subtypes.itervalues():
+ if value == self.SUBTYPE_PROPERTY:
+ count += 1
+ return count
+ def get_properties(self):
+ props = {}
+ for name,value in self._subtypes.iteritems():
+ if value == self.SUBTYPE_PROPERTY:
+ props[name] = self._values.get(name)
+ return props
+ def get_property(self, name):
+ if self._subtypes.get(name) == self.SUBTYPE_PROPERTY:
+ return self._values.get(name)
+ return None
+ def add_property(self, name, prop):
+ self.set_value(name, prop, self.SUBTYPE_PROPERTY)
# need to re-generate schema hash
- self._classId = None
+ self._classId._hstr = None
- def getProperty(self, name):
+ def get_value(self, name):
# check for meta-properties first
- if name == QmfQuery._PRED_PACKAGE:
- return self._pname
- if name == QmfQuery._PRED_CLASS:
- return self._cname
- if name == QmfQuery._PRED_TYPE:
- return self._type
- if name == QmfQuery._PRED_HASH:
- return self.getClassId().getHashString()
- if name == QmfQuery._PRED_SCHEMA_ID:
- return self.getClassId()
- return super(SchemaClass, self).getProperty(name)
- def hasProperty(self, name):
- if name in [QmfQuery._PRED_PACKAGE, QmfQuery._PRED_CLASS,
- QmfQuery._PRED_TYPE, QmfQuery._PRED_HASH, QmfQuery._PRED_SCHEMA_ID]:
+ if name == SchemaClassId.KEY_PACKAGE:
+ return self._classId.get_package_name()
+ if name == SchemaClassId.KEY_CLASS:
+ return self._classId.get_class_name()
+ if name == SchemaClassId.KEY_TYPE:
+ return self._classId.get_type()
+ if name == SchemaClassId.KEY_HASH:
+ return self.get_class_id().get_hash_string()
+ if name == self.KEY_SCHEMA_ID:
+ return self.get_class_id()
+ if name == self.KEY_PRIMARY_KEY_NAMES:
+ return self._object_id_names[:]
+ return super(SchemaClass, self).get_value(name)
+ def has_value(self, name):
+ if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS, SchemaClassId.KEY_TYPE,
return True
- super(SchemaClass, self).hasProperty(name)
+ super(SchemaClass, self).has_value(name)
- def mapEncode(self):
+ def map_encode(self):
Return the map encoding of this schema.
- _map = {}
- _map["schema_id"] = self.getClassId().mapEncode()
- _map["desc"] = self._desc
- if len(self._properties):
- _props = {}
- for name,val in self._properties.iteritems():
- _props[name] = val.mapEncode()
- _map["properties"] = _props
- if len(self._methods):
- _meths = {}
- for name,val in self._methods.iteritems():
- _meths[name] = val.mapEncode()
- _map["methods"] = _meths
- if len(self._pkeyNames):
- _map["primary_key"] = self._pkeyNames[:]
+ _map = super(SchemaClass,self).map_encode()
+ _map[self.KEY_SCHEMA_ID] = self.get_class_id().map_encode()
+ if self._object_id_names:
+ _map[self.KEY_PRIMARY_KEY_NAMES] = self._object_id_names[:]
+ if self._desc:
+ _map[self.KEY_DESC] = self._desc
return _map
def __repr__(self):
- return str(self.getClassId())
+ return str(self.get_class_id())
@@ -2067,14 +1637,11 @@ class SchemaObjectClass(SchemaClass):
all properties named in the primary key list.
Map format:
- map["schema_id"] = map, SchemaClassId map for this object.
- map["desc"] = human readable description of this schema.
- map["primary_key"] = ordered list of property names used to construct the Primary Key"
- map["properties"] = map of "name":SchemaProperty instances.
- map["methods"] = map of "name":SchemaMethods instances.
+ map(SchemaClass)
- def __init__( self, pname, cname, desc=None, _hash=None,
- _props={}, _pkey=[], _methods={}):
+ def __init__(self, _classId=None, _desc=None,
+ _props={}, _methods={}, _object_id_names=None,
+ _map=None):
@type pname: str
@param pname: name of package this schema belongs to
@@ -2091,64 +1658,46 @@ class SchemaObjectClass(SchemaClass):
@type _methods: map of 'name':<SchemaMethod> objects
@param _methods: all methods provided by this schema
- super(SchemaObjectClass, self).__init__(pname,
- cname,
- SchemaTypeData,
- desc,
- _hash)
- self._properties = _props.copy()
- self._pkeyNames = _pkey[:]
- self._methods = _methods.copy()
- def getPrimaryKeyList(self): return self._pkeyNames[:]
- def getMethodCount(self): return len(self._methods)
- def getMethods(self): return self._methods.copy()
- def getMethod(self, name): return self._methods[name]
- def addMethod(self, name, method):
- self._methods[name] = method
- self._classId = None
-def SchemaObjectClassFactory( param ):
- """
- Factory for constructing a SchemaObjectClass from various sources.
+ if _map is not None:
+ super(SchemaObjectClass,self).__init__(_map=_map)
+ else:
+ super(SchemaObjectClass, self).__init__(_classId=_classId, _desc=_desc)
+ self._object_id_names = _object_id_names
+ for name,value in _props.iteritems():
+ self.set_value(name, value, self.SUBTYPE_PROPERTY)
+ for name,value in _methods.iteritems():
+ self.set_value(name, value, self.SUBTYPE_METHOD)
+ if self._classId.get_type() != SchemaClassId.TYPE_DATA:
+ raise TypeError("Invalid ClassId type for data schema: %s" % self._classId)
+ def get_id_names(self):
+ return self._object_id_names[:]
+ def get_method_count(self):
+ count = 0
+ for value in self._subtypes.itervalues():
+ if value == self.SUBTYPE_METHOD:
+ count += 1
+ return count
+ def get_methods(self):
+ meths = {}
+ for name,value in self._subtypes.iteritems():
+ if value == self.SUBTYPE_METHOD:
+ meths[name] = self._values.get(name)
+ return meths
+ def get_method(self, name):
+ if self._subtypes.get(name) == self.SUBTYPE_METHOD:
+ return self._values.get(name)
+ return None
- @type param: various
- @param param: object to use for constructing a SchemaObjectClass instance
- @rtype: SchemaObjectClass
- @returns: a new SchemaObjectClass instance
- """
- if type(param) == dict:
- classId = None
- properties = {}
- methods = {}
- pkey = []
- if "schema_id" in param:
- classId = SchemaClassIdFactory(param["schema_id"])
- if (not classId) or (classId.getType() != SchemaTypeData):
- raise TypeError("Invalid SchemaClassId specified: %s" % classId)
- if "desc" in param:
- desc = param["desc"]
- if "primary_key" in param:
- pkey = param["primary_key"]
- if "properties" in param:
- for name,val in param["properties"].iteritems():
- properties[name] = SchemaPropertyFactory(val)
- if "methods" in param:
- for name,val in param["methods"].iteritems():
- methods[name] = SchemaMethodFactory(val)
- return SchemaObjectClass( classId.getPackageName(),
- classId.getClassName(),
- desc,
- _hash = classId.getHashString(),
- _props = properties, _pkey = pkey,
- _methods = methods)
+ def add_method(self, name, method):
+ self.set_value(name, method, self.SUBTYPE_METHOD)
+ # need to re-generate schema hash
+ self._classId._hstr = None
- else:
- raise TypeError("Invalid type for SchemaObjectClass construction")
@@ -2162,46 +1711,21 @@ class SchemaEventClass(SchemaClass):
map["desc"] = string description of this schema
map["properties"] = map of "name":SchemaProperty values.
- def __init__( self, pname, cname, desc=None, _props={}, _hash=None ):
- super(SchemaEventClass, self).__init__(pname,
- cname,
- SchemaTypeEvent,
- desc,
- _hash )
- self._properties = _props.copy()
+ def __init__(self, _classId=None, _desc=None, _props={},
+ _map=None):
+ if _map is not None:
+ super(SchemaEventClass,self).__init__(_map=_map)
+ else:
+ super(SchemaEventClass, self).__init__(_classId=_classId,
+ _desc=_desc)
+ for name,value in _props.iteritems():
+ self.set_value(name, value, self.SUBTYPE_PROPERTY)
+ if self._classId.get_type() != SchemaClassId.TYPE_EVENT:
+ raise TypeError("Invalid ClassId type for event schema: %s" %
+ self._classId)
-def SchemaEventClassFactory( param ):
- """
- Factory for constructing a SchemaEventClass from various sources.
- @type param: various
- @param param: object to use for constructing a SchemaEventClass instance
- @rtype: SchemaEventClass
- @returns: a new SchemaEventClass instance
- """
- if type(param) == dict:
- logging.debug( "constructing SchemaEventClass from map '%s'" % param )
- classId = None
- properties = {}
- if "schema_id" in param:
- classId = SchemaClassIdFactory(param["schema_id"])
- if (not classId) or (classId.getType() != SchemaTypeEvent):
- raise TypeError("Invalid SchemaClassId specified: %s" % classId)
- if "desc" in param:
- desc = param["desc"]
- if "properties" in param:
- for name,val in param["properties"].iteritems():
- properties[name] = SchemaPropertyFactory(val)
- return SchemaEventClass( classId.getPackageName(),
- classId.getClassName(),
- desc,
- _hash = classId.getHashString(),
- _props = properties )
- else:
- raise TypeError("Invalid type for SchemaEventClass construction")
diff --git a/qpid/python/qmf/ b/qpid/python/qmf/
index cc8c284579..027ce163e5 100644
--- a/qpid/python/qmf/
+++ b/qpid/python/qmf/
@@ -30,10 +30,9 @@ from threading import Condition
from qpid.messaging import *
- AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode,
- QmfQuery, AgentIdFactory, Notifier, QmfQueryPredicate, MsgKey,
- QmfData)
+from qmfCommon import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
+ QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
@@ -186,69 +185,106 @@ class SequencedWaiter(object):
return False
-#class ObjectProxy(QmfObject):
-class ObjectProxy(object):
+class QmfConsoleData(QmfData):
- A local representation of a QmfObject that is managed by a remote agent.
+ Console's representation of an managed QmfData instance.
- def __init__(self, agent, cls, kwargs={}):
+ def __init__(self, map_, agent, _schema=None):
+ super(QmfConsoleData, self).__init__(_map=map_,
+ _schema=_schema,
+ _const=True)
+ self._agent = agent
+ def get_timestamps(self):
- @type agent: qmfConsole.AgentProxy
- @param agent: Agent that manages this object.
- @type cls: qmfCommon.SchemaObjectClass
- @param cls: Schema that describes the class.
- @type kwargs: dict
- @param kwargs: ??? supported keys ???
+ Returns a list of timestamps describing the lifecycle of
+ the object. All timestamps are represented by the AMQP
+ timestamp type. [0] = time of last update from Agent,
+ [1] = creation timestamp
+ [2] = deletion timestamp, or zero if not
+ deleted.
- # QmfObject.__init__(self, cls, kwargs)
- self._agent = agent
+ return [self._utime, self._ctime, self._dtime]
+ def get_create_time(self):
+ """
+ returns the creation timestamp
+ """
+ return self._ctime
- # def update(self):
- def refresh(self, timeout = None):
+ def get_update_time(self):
+ """
+ returns the update timestamp
- Called to re-fetch the current state of the object from the agent. This updates
- the contents of the object to their most current values.
+ return self._utime
- @rtype: bool
- @return: True if refresh succeeded. Refresh may fail if agent does not respond.
+ def get_delete_time(self):
+ """
+ returns the deletion timestamp, or zero if not yet deleted.
- if not self._agent:
- raise Exception("No Agent associated with this object")
- newer = self._agent.get_object(QmfQuery({"object_id":None}), timeout)
- if newer == None:
- logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
- raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
- #self.mergeUpdate(newer) ### ??? in Rafi's Class
+ return self._dtime
- ### def _merge_update(self, newerObject):
- ### ??? in Rafi's Class
+ def is_deleted(self):
+ """
+ True if deletion timestamp not zero.
+ """
+ return self._dtime != long(0)
+ def refresh(self, _reply_handle=None, _timeout=None):
+ """
+ request that the Agent update the value of this object's
+ contents.
+ """
+ logging.error(" TBD!!!")
+ return None
- ### def is_deleted(self):
- ### ??? in Rafi's Class
+ def invoke_method(self, name, _in_args=None, _reply_handle=None,
+ _timeout=None):
+ """
+ invoke the named method.
+ """
+ logging.error(" TBD!!!")
+ return None
- def key(self): pass
+class QmfLocalData(QmfData):
+ """
+ Console's representation of an unmanaged QmfData instance. There
+ is no remote agent associated with this instance. The Console has
+ full control over this instance.
+ """
+ def __init__(self, values, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ super(QmfLocalData, self).__init__(_values=values,
+ _subtypes=_subtypes, _tag=_tag,
+ _object_id=_object_id,
+ _schema=_schema, _ctime=ctime,
+ _utime=ctime, _const=False)
class Agent(object):
A local representation of a remote agent managed by this console.
- def __init__(self, agent_id, console):
+ def __init__(self, name, console):
@type name: AgentId
@param name: uniquely identifies this agent in the AMQP domain.
- if not isinstance(agent_id, AgentId):
- raise TypeError("parameter must be an instance of class AgentId")
if not isinstance(console, Console):
raise TypeError("parameter must be an instance of class Console")
- self._id = agent_id
- self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)
+ self._name = name
+ self._address =, console._domain)
self._console = console
self._sender = None
self._packages = {} # map of {package-name:[list of class-names], } for this agent
@@ -257,8 +293,8 @@ class Agent(object):
logging.debug( "Created Agent with address: [%s]" % self._address )
- def getAgentId(self):
- return self._id
+ def get_name(self):
+ return self._name
def isActive(self):
return self._announce_timestamp != None
@@ -267,7 +303,7 @@ class Agent(object):
Low-level routine to asynchronously send a message to this agent.
- msg.reply_to = self._console.address()
+ msg.reply_to = str(self._console._address)
# handle = self._console._req_correlation.allocate()
# if handle == 0:
# raise Exception("Can not allocate a correlation id!")
@@ -329,7 +365,7 @@ class Agent(object):
def __repr__(self):
- return self._address
+ return str(self._address)
def __str__(self):
return self.__repr__()
@@ -339,7 +375,7 @@ class Agent(object):
msg = Message(subject=makeSubject(OpCode.get_query),
- content={MsgKey.query: query.mapEncode()})
+ content={MsgKey.query: query.map_encode()})
self._sendMsg( msg, correlation_id )
@@ -389,7 +425,7 @@ class Console(Thread):
A Console manages communications to a collection of agents on behalf of an application.
- def __init__(self, name=None, notifier=None,
+ def __init__(self, name=None, _domain=None, notifier=None,
reply_timeout = 60,
# agent_timeout = 120,
agent_timeout = 60,
@@ -403,10 +439,12 @@ class Console(Thread):
@param kwargs: ??? Unused
- self._name = name
- if not self._name:
+ if not name:
self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
- self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name
+ else:
+ self._name = str(name)
+ self._domain = _domain
+ self._address =, self._domain)
self._notifier = notifier
self._lock = Lock()
self._conn = None
@@ -467,10 +505,24 @@ class Console(Thread):
raise Exception( "Multiple connections per Console not supported." );
self._conn = conn
self._session = conn.session(name=self._name)
- self._direct_recvr = self._session.receiver(self._address, capacity=1)
- self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION,
+ self._direct_recvr = self._session.receiver(str(self._address) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}",
+ capacity=1)
+ ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+ logging.debug("agent.ind addr=%s" % ind_addr)
+ self._announce_recvr = self._session.receiver(str(ind_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}",
- self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE)
+ locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+ logging.debug("agent.locate addr=%s" % locate_addr)
+ self._locate_sender = self._session.sender(str(locate_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}")
# Now that receivers are created, fire off the receive thread...
@@ -495,7 +547,7 @@ class Console(Thread):
if self.isAlive():
# kick my thread to wake it up
logging.debug("Making temp sender for [%s]" % self._address)
- tmp_sender = self._session.sender(self._address)
+ tmp_sender = self._session.sender(str(self._address))
msg = Message(subject=makeSubject(OpCode.noop))
tmp_sender.send( msg, sync=True )
@@ -535,21 +587,17 @@ class Console(Thread):
- def findAgent(self, agent_id, timeout=None ):
+ def findAgent(self, name, timeout=None ):
Given the id of a particular agent, return an instance of class Agent
representing that agent. Return None if the agent does not exist.
- if not isinstance(agent_id, AgentId):
- raise TypeError("parameter must be an instance of class AgentId")
- if agent_id in self._agent_map:
- return self._agent_map[agent_id]
+ agent = self._agent_map.get(name)
+ if agent:
+ return agent
@@ -559,17 +607,21 @@ class Console(Thread):
if handle == 0:
raise Exception("Can not allocate a correlation id!")
- tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id))
+ tmp_sender = self._session.sender(str(,
+ self._domain))
+ + ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
- {QmfQuery._LOGIC_AND:
- [{QmfQuery._CMP_EQ: ["vendor", agent_id.vendor()]},
- {QmfQuery._CMP_EQ: ["product", agent_id.product()]},
- {QmfQuery._CMP_EQ: ["name",]}]}})
+ {QmfQuery._CMP_EQ: ["_name", name]}})
msg = Message(subject=makeSubject(OpCode.agent_locate),
- content={MsgKey.query: query.mapEncode()})
- msg.reply_to = self._address
+ content={MsgKey.query: query.map_encode()})
+ msg.reply_to = str(self._address)
msg.correlation_id = str(handle)
logging.debug("Sending Agent Locate (%s)" % time.time())
tmp_sender.send( msg )
@@ -588,13 +640,43 @@ class Console(Thread):
logging.debug("Agent Locate wait ended (%s)" % time.time())
- if agent_id in self._agent_map:
- new_agent = self._agent_map[agent_id]
+ new_agent = self._agent_map.get(name)
return new_agent
+ def doQuery(self, agent, query, timeout=None ):
+ """
+ """
+ handle = self._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+ try:
+ logging.debug("Sending Query to Agent (%s)" % time.time())
+ agent._sendQuery(query, handle)
+ except SendError, e:
+ logging.error(str(e))
+ self._req_correlation.release(handle)
+ return None
+ if not timeout:
+ timeout = self._reply_timeout
+ logging.debug("Waiting for response to Query (%s)" % timeout)
+ reply = self._req_correlation.get_data(handle, timeout)
+ self._req_correlation.release(handle)
+ logging.debug("Agent Query wait ended (%s)" % time.time())
+ if reply:
+ print("Agent Query Reply='%s'" % reply)
+ return reply.content
+ else:
+ print("Agent Query FAILED!!!")
+ return None
def run(self):
global _callback_thread
@@ -607,7 +689,6 @@ class Console(Thread):
msg = self._announce_recvr.fetch(timeout = 0)
if msg:
- logging.error( "Announce Msg Rcvd@%s: [%s]" % (time.time(), msg) )
self._dispatch(msg, _direct=False)
except Empty:
@@ -615,7 +696,6 @@ class Console(Thread):
msg = self._direct_recvr.fetch(timeout = 0)
if msg:
- logging.error( "Direct Msg Rcvd@%s: [%s]" % (time.time(), msg) )
self._dispatch(msg, _direct=True)
except Empty:
@@ -654,6 +734,9 @@ class Console(Thread):
PRIVATE: Process a message received from an Agent
+ logging.error( "Message received from Agent! [%s]" % msg )
version,opcode = parseSubject(msg.subject)
# @todo: deal with version mismatch!!!
@@ -670,7 +753,7 @@ class Console(Thread):
if opcode == OpCode.agent_ind:
self._handleAgentIndMsg( msg, cmap, version, _direct )
elif opcode == OpCode.data_ind:
- logging.warning("!!! data_ind TBD !!!")
+ self._handleDataIndMsg(msg, cmap, version, _direct)
elif opcode == OpCode.event_ind:
logging.warning("!!! event_ind TBD !!!")
elif opcode == OpCode.managed_object:
@@ -696,7 +779,8 @@ class Console(Thread):
if MsgKey.agent_info in cmap:
- agent_id = AgentIdFactory(cmap[MsgKey.agent_info])
+ # TODO: fix
+ name = cmap[MsgKey.agent_info]["_name"]
logging.warning("Bad agent-ind message received: '%s'" % msg)
@@ -709,21 +793,22 @@ class Console(Thread):
if direct and correlated:
ignore = False
elif self._agent_discovery_filter:
- matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode()))
- ignore = not matched
+ logging.error("FIXME: agent discovery filter - new agent name style")
+ # matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode()))
+ # ignore = not matched
+ matched = True; ignore = False # for now
if not ignore:
agent = None
- if agent_id in self._agent_map:
- agent = self._agent_map[agent_id]
+ agent = self._agent_map.get(name)
if not agent:
# need to create and add a new agent
- agent = self._createAgent(agent_id)
+ agent = self._createAgent(name)
# lock out expiration scanning code
@@ -746,6 +831,22 @@ class Console(Thread):
+ def _handleDataIndMsg(self, msg, cmap, version, direct):
+ """
+ Process a received data-ind message.
+ """
+ logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time()))
+ if not self._req_correlation.isValid(msg.correlation_id):
+ logging.error("FIXME: uncorrelated data indicate??? msg='%s'" % str(msg))
+ return
+ # wake up all waiters
+ logging.error("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
def _expireAgents(self):
Check for expired agents and issue notifications when they expire.
@@ -777,21 +878,26 @@ class Console(Thread):
- def _createAgent( self, agent_id ):
+ def _createAgent( self, name ):
Factory to create/retrieve an agent for this console
- if not isinstance(agent_id, AgentId):
- raise TypeError("parameter must be an instance of class AgentId")
- if agent_id in self._agent_map:
- return self._agent_map[agent_id]
- agent = Agent(agent_id, self)
- agent._sender = self._session.sender(agent._address)
- self._agent_map[agent_id] = agent
+ agent = self._agent_map.get(name)
+ if agent:
+ return agent
+ agent = Agent(name, self)
+ agent._sender = self._session.sender(str(agent._address) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+ self._agent_map[name] = agent
@@ -1235,33 +1341,13 @@ class Console(Thread):
if __name__ == '__main__':
# temp test code
- from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory,
- SchemaObjectClassFactory, ObjectIdFactory, QmfDescribed,
- QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
- QmfEvent)
- logging.getLogger().setLevel(logging.INFO)
- "Starting Connection" )
- _c = Connection("localhost")
- _c.connect()
- #c.start()
+ from qmfCommon import (qmfTypes, QmfData,
+ QmfEvent, SchemaClassId, SchemaEventClass,
+ SchemaProperty, SchemaObjectClass)
- "Starting Console" )
- _myConsole = Console()
- _myConsole.addConnection( _c )
- "Finding Agent" )
- _myAgent = _myConsole.findAgent( AgentId( "", "agent", "tross" ), 5 )
- "Agent Found: %s" % _myAgent )
- "Removing connection" )
- _myConsole.removeConnection( _c, 10 )
- "Destroying console:" )
- _myConsole.destroy( 10 )
+ logging.getLogger().setLevel(logging.INFO)
- "************* Starting Async Console **************" )
+ "************* Creating Async Console **************" )
class MyNotifier(Notifier):
def __init__(self, context):
@@ -1275,239 +1361,179 @@ if __name__ == '__main__':
_noteMe = MyNotifier( 666 )
_myConsole = Console(notifier=_noteMe)
- _myConsole.addConnection( _c )
- while not _noteMe.WorkAvailable:
- try:
- print("No work yet...sleeping!")
- time.sleep(1)
- except KeyboardInterrupt:
- break
- print("Work available = %d items!" % _myConsole.getWorkItemCount())
- _wi = _myConsole.getNextWorkItem(timeout=0)
- while _wi:
- print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
- _wi = _myConsole.getNextWorkItem(timeout=0)
- "Removing connection" )
- _myConsole.removeConnection( _c, 10 )
- "Destroying console:" )
_myConsole.destroy( 10 ) "******** Messing around with Schema ********" )
- _sec = SchemaEventClassFactory( { "schema_id": # SchemaClassId map
- {"package_name": "myPackage",
- "class_name": "myClass",
- "type": "event"},
- "desc": "A typical event schema",
- "properties": {"Argument-1":
- {"amqp_type": qmfTypes.TYPE_UINT8,
- "min": 0,
- "max": 100,
- "unit": "seconds",
- "desc": "sleep value"},
- "Argument-2":
- {"amqp_type": qmfTypes.TYPE_LSTR,
- "maxlen": 100,
- "desc": "a string argument"}}} )
- print("_sec=%s" % _sec.getClassId())
- print("_sec.gePropertyCount()=%d" % _sec.getPropertyCount() )
- print("_sec.getProperty('Argument-1`)=%s" % _sec.getProperty('Argument-1') )
- print("_sec.getProperty('Argument-2`)=%s" % _sec.getProperty('Argument-2') )
+ _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A typical event schema",
+ _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8,
+ kwargs = {"min":0,
+ "max":100,
+ "unit":"seconds",
+ "desc":"sleep value"}),
+ "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR,
+ kwargs={"maxlen":100,
+ "desc":"a string argument"})})
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec.gePropertyCount()=%d" % _sec.get_property_count() )
+ print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') )
+ print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') )
- print("_sec.getProperty('not-found')=%s" % _sec.getProperty('not-found') )
+ print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') )
- print("_sec.getProperties()='%s'" % _sec.getProperties())
+ print("_sec.getProperties()='%s'" % _sec.get_properties())
print("Adding another argument")
- _arg3 = SchemaPropertyFactory( { "amqp_type": qmfTypes.TYPE_BOOL,
- "dir": "IO",
- "desc": "a boolean argument"} )
- _sec.addProperty('Argument-3', _arg3)
- print("_sec=%s" % _sec.getClassId())
- print("_sec.getPropertyCount()=%d" % _sec.getPropertyCount() )
- print("_sec.getProperty('Argument-1')=%s" % _sec.getProperty('Argument-1') )
- print("_sec.getProperty('Argument-2')=%s" % _sec.getProperty('Argument-2') )
- print("_sec.getProperty('Argument-3')=%s" % _sec.getProperty('Argument-3') )
- print("_arg3.mapEncode()='%s'" % _arg3.mapEncode() )
- _secmap = _sec.mapEncode()
+ _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL,
+ kwargs={"dir":"IO",
+ "desc":"a boolean argument"})
+ _sec.add_property('Argument-3', _arg3)
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec.getPropertyCount()=%d" % _sec.get_property_count() )
+ print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') )
+ print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') )
+ print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') )
+ print("_arg3.mapEncode()='%s'" % _arg3.map_encode() )
+ _secmap = _sec.map_encode()
print("_sec.mapEncode()='%s'" % _secmap )
- _sec2 = SchemaEventClassFactory( _secmap )
- print("_sec=%s" % _sec.getClassId())
- print("_sec2=%s" % _sec2.getClassId())
- _soc = SchemaObjectClassFactory( {"schema_id": {"package_name": "myOtherPackage",
- "class_name": "myOtherClass",
- "type": "data"},
- "desc": "A test data object",
- "properties":
- {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
- "access": "RO",
- "index": True,
- "unit": "degrees"},
- "prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
- "access": "RW",
- "index": True,
- "desc": "The Second Property(tm)",
- "unit": "radians"},
- "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
- "unit": "seconds",
- "desc": "time until I retire"}},
- "methods":
- {"meth1": {"desc": "A test method",
- "arguments":
- {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
- "desc": "an argument 1",
- "dir": "I"},
- "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
- "dir": "IO",
- "desc": "some weird boolean"}}},
- "meth2": {"desc": "A test method",
- "arguments":
- {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
- "desc": "an 'nuther argument",
- "dir": "I"}}}},
- "primary_key": ["prop2", "prop1"]})
+ _sec2 = SchemaEventClass( _map=_secmap )
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec2=%s" % _sec2.get_class_id())
+ _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage",
+ "_class_name": "myOtherClass",
+ "_type": "_data"},
+ "_desc": "A test data object",
+ "_values":
+ {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
+ "access": "RO",
+ "index": True,
+ "unit": "degrees"},
+ "prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
+ "access": "RW",
+ "index": True,
+ "desc": "The Second Property(tm)",
+ "unit": "radians"},
+ "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
+ "unit": "seconds",
+ "desc": "time until I retire"},
+ "meth1": {"desc": "A test method",
+ "arguments":
+ {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+ "desc": "an argument 1",
+ "dir": "I"},
+ "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
+ "dir": "IO",
+ "desc": "some weird boolean"}}},
+ "meth2": {"desc": "A test method",
+ "arguments":
+ {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+ "desc": "an 'nuther argument",
+ "dir":
+ "I"}}}},
+ "_subtypes":
+ {"prop1":"qmfProperty",
+ "prop2":"qmfProperty",
+ "statistics":"qmfProperty",
+ "meth1":"qmfMethod",
+ "meth2":"qmfMethod"},
+ "_primary_key_names": ["prop2", "prop1"]})
print("_soc='%s'" % _soc)
- print("_soc.getPrimaryKeyList='%s'" % _soc.getPrimaryKeyList())
+ print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names())
- print("_soc.getPropertyCount='%d'" % _soc.getPropertyCount())
- print("_soc.getProperties='%s'" % _soc.getProperties())
- print("_soc.getProperty('prop2')='%s'" % _soc.getProperty('prop2'))
+ print("_soc.getPropertyCount='%d'" % _soc.get_property_count())
+ print("_soc.getProperties='%s'" % _soc.get_properties())
+ print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2'))
- print("_soc.getMethodCount='%d'" % _soc.getMethodCount())
- print("_soc.getMethods='%s'" % _soc.getMethods())
- print("_soc.getMethod('meth2')='%s'" % _soc.getMethod('meth2'))
+ print("_soc.getMethodCount='%d'" % _soc.get_method_count())
+ print("_soc.getMethods='%s'" % _soc.get_methods())
+ print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2'))
- _socmap = _soc.mapEncode()
+ _socmap = _soc.map_encode()
print("_socmap='%s'" % _socmap)
- _soc2 = SchemaObjectClassFactory( _socmap )
+ _soc2 = SchemaObjectClass( _map=_socmap )
print("_soc='%s'" % _soc)
print("_soc2='%s'" % _soc2)
- if _soc2.getClassId() == _soc.getClassId():
+ if _soc2.get_class_id() == _soc.get_class_id():
print("soc and soc2 are the same schema") "******** Messing around with ObjectIds ********" )
- oid = ObjectIdFactory( {"agent_id": {"vendor": "",
- "product": "mgmt-tool",
- "name": "myAgent1"},
- "primary_key": "key1:key2" })
- print("oid = %s" % oid)
- oid2 = ObjectIdFactory( oid.mapEncode() )
- print("oid2 = %s" % oid2)
- if oid == oid2:
- print("oid1 == oid2")
- else:
- print("oid1 != oid2")
- hashme = {oid: "myoid"}
- print("oid hash = %s" % hashme[oid2] )
- qd = QmfData( {"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
+ qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
print("qd='%s':" % qd)
print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4))
- print("qd map='%s'" % qd.mapEncode())
- print("qd getProperty('prop4')='%s'" % qd.getProperty("prop4"))
- qd.setProperty("prop4", 4)
- print("qd setProperty('prop4', 4)='%s'" % qd.getProperty("prop4"))
+ print("qd map='%s'" % qd.map_encode())
+ print("qd getProperty('prop4')='%s'" % qd.get_value("prop4"))
+ qd.set_value("prop4", 4, "A test property called 4")
+ print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4"))
qd.prop4 = 9
print("qd.prop4 = 9 ='%s'" % qd.prop4)
qd["prop4"] = 11
print("qd[prop4] = 11 ='%s'" % qd["prop4"])
- print("qd.mapEncode()='%s'" % qd.mapEncode())
- _qd2 = QmfDataFactory( qd.mapEncode() )
- print("_qd2.mapEncode()='%s'" % _qd2.mapEncode())
+ print("qd.mapEncode()='%s'" % qd.map_encode())
+ _qd2 = QmfData( _map = qd.map_encode() )
+ print("_qd2.mapEncode()='%s'" % _qd2.map_encode())
- _qmfDesc1 = QmfDescribed( _schemaId = _soc.getClassId(),
- _props = {"prop1": 1, "statistics": 666, "prop2": 0})
+ _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666,
+ "prop2": 0}},
+ agent="some agent name?",
+ _schema = _soc)
- print("_qmfDesc1 map='%s'" % _qmfDesc1.mapEncode())
+ print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode())
- _qmfDesc1.setSchema( _soc )
+ _qmfDesc1._set_schema( _soc )
- print("_qmfDesc1 props{} = '%s'" % _qmfDesc1.getProperties())
- print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.getPrimaryKey())
- print("_qmfDesc1 classid = '%s'" % _qmfDesc1.getSchemaClassId())
+ print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2"))
+ print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id())
+ print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id())
- _qmfDescMap = _qmfDesc1.mapEncode()
+ _qmfDescMap = _qmfDesc1.map_encode()
print("_qmfDescMap='%s'" % _qmfDescMap)
- _qmfDesc2 = QmfDescribedFactory( _qmfDescMap, _schema=_soc )
- print("_qmfDesc2 map='%s'" % _qmfDesc2.mapEncode())
- print("_qmfDesc2 props = '%s'" % _qmfDesc2.getProperties())
- print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.getPrimaryKey())
+ _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc )
- _qmfMgd1 = QmfManaged( _agentId=AgentId("", "anAgent", "tross"),
- _schema = _soc,
- _schemaId = _soc.getClassId(),
- _props = {"prop1": 11, "prop2": 10, "statistics":999})
- print("_qmfMgd1 map='%s'" % _qmfMgd1.mapEncode())
- print("_qmfMgd1.getObjectId()='%s'" % _qmfMgd1.getObjectId())
- print("_qmfMgd1 props = '%s'" % _qmfMgd1.getProperties())
- _qmfMgd1Map = _qmfMgd1.mapEncode()
- print("_qmfMgd1Map='%s'" % _qmfMgd1Map)
- _qmfMgd2 = QmfManagedFactory( param=_qmfMgd1.mapEncode(), _schema=_soc )
- print("_qmfMgd2 map='%s'" % _qmfMgd2.mapEncode())
- print("_qmfMgd2 getObjectId() = '%s'" % _qmfMgd2.getObjectId())
- print("_qmfMgd2 props = '%s'" % _qmfMgd2.getProperties())
+ print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode())
+ print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2"))
+ print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) "******** Messing around with QmfEvents ********" )
_qmfevent1 = QmfEvent( _timestamp = 1111,
- _agentId = AgentId("", "whizzbang2000", "ted"),
- _schema = _sec,
- _props = {"Argument-1": 77,
- "Argument-3": True,
- "Argument-2": "a string"})
- print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.mapEncode())
- print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.getTimestamp())
- print("_qmfevent1.getAgentId()='%s'" % _qmfevent1.getAgentId())
- _qmfevent1Map = _qmfevent1.mapEncode()
- _qmfevent2 = QmfEvent(_map=_qmfevent1Map)
- print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.mapEncode())
+ _schema = _sec,
+ _values = {"Argument-1": 77,
+ "Argument-3": True,
+ "Argument-2": "a string"})
+ print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode())
+ print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp())
+ _qmfevent1Map = _qmfevent1.map_encode()
+ _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec)
+ print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) "******** Messing around with Queries ********" )
diff --git a/qpid/python/qmf/test/ b/qpid/python/qmf/test/
index d413358dd8..9683127a6e 100644
--- a/qpid/python/qmf/test/
+++ b/qpid/python/qmf/test/
@@ -4,10 +4,8 @@ from threading import Semaphore
from qpid.messaging import *
-from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty,
- SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed,
- QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
- QmfEvent, SchemaMethod, Notifier)
+from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
+ QmfEvent, SchemaMethod, Notifier, SchemaClassId)
from qmfAgent import (Agent, QmfAgentData)
@@ -20,9 +18,9 @@ class ExampleNotifier(Notifier):
def waitForWork(self):
- logging.error("Waiting for event...")
+ print("Waiting for event...")
- logging.error("...event present")
+ print("...event present")
@@ -31,58 +29,54 @@ class ExampleNotifier(Notifier):
_notifier = ExampleNotifier()
-_agent = Agent( "", "qmf", "testAgent", _notifier )
+_agent = Agent( "qmf.testAgent", _notifier=_notifier )
# Dynamically construct a class schema
-_schema = SchemaObjectClass( "MyPackage", "MyClass",
- desc="A test data schema",
- _pkey=["index1", "index2"] )
+_schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
# add properties
-_schema.addProperty( "index1",
- SchemaProperty(qmfTypes.TYPE_UINT8))
-_schema.addProperty( "index2",
- SchemaProperty(qmfTypes.TYPE_LSTR))
+_schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+_schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
# these two properties are statistics
-_schema.addProperty( "query_count",
- SchemaProperty(qmfTypes.TYPE_UINT32))
-_schema.addProperty( "method_call_count",
- SchemaProperty(qmfTypes.TYPE_UINT32))
+_schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+_schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
# These two properties can be set via the method call
-_schema.addProperty( "set_string",
- SchemaProperty(qmfTypes.TYPE_LSTR))
-_schema.addProperty( "set_int",
- SchemaProperty(qmfTypes.TYPE_UINT32))
+_schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+_schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
# add method
_meth = SchemaMethod( _desc="Method to set string and int in object." )
-_meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
-_meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
-_schema.addMethod( "set_meth", _meth )
+_meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+_meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+_schema.add_method( "set_meth", _meth )
# Add schema to Agent
# instantiate managed data objects matching the schema
-_obj = QmfAgentData( _agent, _schema )
-_obj.setProperty("index1", 100)
-_obj.setProperty("index2", "a name" )
-_obj.setProperty("set_string", "UNSET")
-_obj.setProperty("set_int", 0)
-_obj.setProperty("query_count", 0)
-_obj.setProperty("method_call_count", 0)
-_agent.addObject( _obj )
-_agent.addObject( QmfAgentData( _agent, _schema,
- _props={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
+_obj = QmfAgentData( _agent, _schema=_schema )
+_obj.set_value("index1", 100)
+_obj.set_value("index2", "a name" )
+_obj.set_value("set_string", "UNSET")
+_obj.set_value("set_int", 0)
+_obj.set_value("query_count", 0)
+_obj.set_value("method_call_count", 0)
+_agent.add_object( _obj )
+_agent.add_object( QmfAgentData( _agent, _schema=_schema,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
## Now connect to the broker
@@ -100,18 +94,18 @@ while not _done:
while _wi:
print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
- _wi = _agent.getNextWorkitem(timeout=0)
+ _wi = _agent.getNextWorkItem(timeout=0)
- "shutting down..." )
+ print( "shutting down..." )
_done = True "Removing connection... TBD!!!" )
+print( "Removing connection... TBD!!!" )
#_myConsole.remove_connection( _c, 10 ) "Destroying agent... TBD!!!" )
+print( "Destroying agent... TBD!!!" )
#_myConsole.destroy( 10 ) "******** agent test done ********" )
+print( "******** agent test done ********" )
diff --git a/qpid/python/qmf/test/ b/qpid/python/qmf/test/
index e649b2b8e4..6db515dc99 100644
--- a/qpid/python/qmf/test/
+++ b/qpid/python/qmf/test/
@@ -4,7 +4,7 @@ from threading import Semaphore
from qpid.messaging import *
-from qmfCommon import (Notifier, QmfQuery)
+from qmfCommon import (Notifier, QmfQuery, MsgKey, SchemaClassId, SchemaClass)
from qmfConsole import Console
@@ -16,18 +16,18 @@ class ExampleNotifier(Notifier):
def waitForWork(self):
- logging.error("Waiting for event...")
+ print("Waiting for event...")
- logging.error("...event present")
+ print("...event present")
logging.getLogger().setLevel(logging.INFO) "Starting Connection" )
+print( "Starting Connection" )
_c = Connection("localhost")
_c.connect() "Starting Console" )
+print( "Starting Console" )
_notifier = ExampleNotifier()
_myConsole = Console(notifier=_notifier)
@@ -40,30 +40,65 @@ _myConsole.addConnection( _c )
_query = {QmfQuery._TARGET:
- {QmfQuery._LOGIC_AND:
- [{QmfQuery._CMP_EQ: ["vendor", ""]},
- {QmfQuery._CMP_EQ: ["product", "qmf"]}]}}
+ {QmfQuery._CMP_EQ: ["_name", "qmf.testAgent"]}}
_query = QmfQuery(_query)
_done = False
while not _done:
- try:
- _notifier.waitForWork()
- _wi = _myConsole.get_next_workitem(timeout=0)
- while _wi:
- print("!!! work item received %d:%s" % (_wi.getType(), str(_wi.getParams())))
- _wi = _myConsole.get_next_workitem(timeout=0)
- except:
- "shutting down..." )
- _done = True
- "Removing connection" )
+# try:
+ _notifier.waitForWork()
+ _wi = _myConsole.getNextWorkItem(timeout=0)
+ while _wi:
+ print("!!! work item received %d:%s" % (_wi.getType(),
+ str(_wi.getParams())))
+ if _wi.getType() == _wi.AGENT_ADDED:
+ _agent = _wi.getParams().get("agent")
+ if not _agent:
+ print("!!!! AGENT IN REPLY IS NULL !!! ")
+ _query = QmfQuery( {QmfQuery._TARGET:
+ {QmfQuery._TARGET_PACKAGES:None}} )
+ _reply = _myConsole.doQuery(_agent, _query)
+ package_list = _reply.get(MsgKey.package_info)
+ for pname in package_list:
+ print("!!! Querying for schema from package: %s" % pname)
+ _query = QmfQuery({QmfQuery._TARGET:
+ {QmfQuery._TARGET_SCHEMA_ID:None},
+ QmfQuery._PREDICATE:
+ {QmfQuery._CMP_EQ:
+ [SchemaClassId.KEY_PACKAGE, pname]}})
+ _reply = _myConsole.doQuery(_agent, _query)
+ schema_id_list = _reply.get(MsgKey.schema_id)
+ for sid_map in schema_id_list:
+ _query = QmfQuery({QmfQuery._TARGET:
+ {QmfQuery._TARGET_SCHEMA:None},
+ QmfQuery._PREDICATE:
+ {QmfQuery._CMP_EQ:
+ [SchemaClass.KEY_SCHEMA_ID, sid_map]}})
+ _reply = _myConsole.doQuery(_agent, _query)
+ _myConsole.releaseWorkItem(_wi)
+ _wi = _myConsole.getNextWorkItem(timeout=0)
+# except:
+# "shutting down..." )
+# _done = True
+print( "Removing connection" )
_myConsole.removeConnection( _c, 10 ) "Destroying console:" )
+print( "Destroying console:" )
_myConsole.destroy( 10 ) "******** console test done ********" )
+print( "******** console test done ********" )