From c7e1aca1e90bacbf7ef579ffce686f187243b3c7 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Mon, 11 Jan 2010 20:46:07 +0000 Subject: QPID-2261 - Patch from Ken Giusti committed to the qmfv2 branch. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@898057 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qmf/qmfAgent.py | 290 ++++-- qpid/python/qmf/qmfCommon.py | 1666 ++++++++++++---------------------- qpid/python/qmf/qmfConsole.py | 602 ++++++------ qpid/python/qmf/test/agent_test.py | 88 +- qpid/python/qmf/test/console_test.py | 79 +- 5 files changed, 1199 insertions(+), 1526 deletions(-) diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index 4b76263903..278eafedbc 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -24,11 +24,10 @@ import time from threading import Thread, Lock from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 -from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, - AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject, - parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, - QmfData) - +from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, + makeSubject, parseSubject, OpCode, QmfQuery, + SchemaObjectClass, MsgKey, QmfData, QmfAddress, + SchemaClass) ##============================================================================== @@ -36,21 +35,18 @@ from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, ##============================================================================== class Agent(Thread): - def __init__(self, vendor, product, name=None, - notifier=None, heartbeat_interval=30, - kwargs={}): + def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, + _max_msg_size=0, _capacity=10): Thread.__init__(self) self._running = False - self.vendor = vendor - self.product = product - if name: - self.name = name - else: - self.name = uuid4().get_urn().split(":")[2] - self._id = AgentId(self.vendor, self.product, self.name) - self._address = str(self._id) - self._notifier = notifier - self._heartbeat_interval = heartbeat_interval + + self.name = str(name) + self._domain = _domain + self._notifier = _notifier + self._heartbeat_interval = _heartbeat_interval + self._max_msg_size = _max_msg_size + self._capacity = _capacity + self._conn = None self._session = None self._lock = Lock() @@ -59,33 +55,48 @@ class Agent(Thread): self._schema = {} self._agent_data = {} - def getAgentId(self): - return AgentId(self.vendor, self.product, self.name) + def get_name(self): + return self.name def setConnection(self, conn): + my_addr = QmfAddress.direct(self.name, self._domain) + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + + logging.debug("my direct addr=%s" % my_addr) + logging.debug("agent.locate addr=%s" % locate_addr) + logging.debug("agent.ind addr=%s" % ind_addr) + self._conn = conn self._session = self._conn.session() - self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE, capacity=10) - self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address, - capacity=10) - self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION) + self._direct_receiver = self._session.receiver(str(my_addr) + + ";{create:always," + " node-properties:" + " {type:topic, x-properties: {type:direct}}}", + capacity=self._capacity) + self._locate_receiver = self._session.receiver(str(locate_addr) + + ";{create:always, node-properties:{type:topic}}", + capacity=self._capacity) + self._ind_sender = self._session.sender(str(ind_addr) + + ";{create:always, node-properties:{type:topic}}") + self._running = True self.start() - def registerObjectClass(self, schema): + def register_object_class(self, schema): """ - Register an instance of a SchemaObjectClass with this agent + Register an instance of a SchemaClass with this agent """ # @todo: need to update subscriptions # @todo: need to mark schema as "non-const" - if not isinstance(schema, SchemaObjectClass): - raise TypeError("SchemaObjectClass instance expected") + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass instance expected") self._lock.acquire() try: - classId = schema.getClassId() - pname = classId.getPackageName() - cname = classId.getClassName() + classId = schema.get_class_id() + pname = classId.get_package_name() + cname = classId.get_class_name() if pname not in self._packages: self._packages[pname] = [cname] else: @@ -96,14 +107,13 @@ class Agent(Thread): finally: self._lock.release() - - def registerEventClass(self, cls): - logging.error("!!!Agent.registerEventClass() TBD!!!") + def register_event_class(self, schema): + return self.register_object_class(schema) def raiseEvent(self, qmfEvent): logging.error("!!!Agent.raiseEvent() TBD!!!") - def addObject(self, data ): + def add_object(self, data ): """ Register an instance of a QmfAgentData object. """ @@ -112,9 +122,13 @@ class Agent(Thread): if not isinstance(data, QmfAgentData): raise TypeError("QmfAgentData instance expected") + id_ = data.get_object_id() + if not id_: + raise TypeError("No identifier assigned to QmfAgentData!") + self._lock.acquire() try: - self._agent_data[data.getObjectId()] = data + self._agent_data[id_] = data finally: self._lock.release() @@ -147,37 +161,32 @@ class Agent(Thread): while self._running: now = datetime.datetime.utcnow() - print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) + # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) if now >= next_heartbeat: self._ind_sender.send(self._makeAgentIndMsg()) logging.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds - print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) + # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) try: - logging.error("waiting for next rcvr (timeout=%s)..." % timeout) - self._session.next_receiver(timeout = timeout) + self._session.next_receiver(timeout=timeout) except Empty: - pass - except KeyboardInterrupt: - break + continue try: msg = self._locate_receiver.fetch(timeout = 0) - if msg.content_type == "amqp/map": - self._dispatch(msg, _direct=False) except Empty: - pass + msg = None + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=False) try: msg = self._direct_receiver.fetch(timeout = 0) - if msg.content_type == "amqp/map": - self._dispatch(msg, _direct=True) except Empty: - pass - - + msg = None + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=True) # # Private: @@ -187,8 +196,8 @@ class Agent(Thread): """ Create an agent indication message identifying this agent """ - _map = self.getAgentId().mapEncode() - _map["schemaTimestamp"] = self._schema_timestamp + _map = {"_name": self.get_name(), + "_schema_timestamp": self._schema_timestamp} return Message( subject=makeSubject(OpCode.agent_ind), properties={"method":"response"}, content={MsgKey.agent_info: _map}) @@ -241,9 +250,11 @@ class Agent(Thread): reply = True if "method" in props and props["method"] == "request": - if MsgKey.query in cmap: - agentIdMap = self.getAgentId().mapEncode() - reply = QmfQuery(cmap[MsgKey.query]).evaluate(QmfData(agentIdMap)) + query = cmap.get(MsgKey.query) + if query is not None: + # fake a QmfData containing my identifier for the query compare + tmpData = QmfData(_values={"_name": self.get_name()}) + reply = QmfQuery(query).evaluate(tmpData) if reply: try: @@ -272,10 +283,9 @@ class Agent(Thread): if target == QmfQuery._TARGET_PACKAGES: self._queryPackages( msg, query ) elif target == QmfQuery._TARGET_SCHEMA_ID: - self._querySchemaId( msg, query ) + self._querySchema( msg, query, _idOnly=True ) elif target == QmfQuery._TARGET_SCHEMA: - logging.warning("!!! Query TARGET=SCHEMA TBD !!!") - #self._querySchema( query.getPredicate(), _idOnly=False ) + self._querySchema( msg, query) elif target == QmfQuery._TARGET_AGENT: logging.warning("!!! Query TARGET=AGENT TBD !!!") elif target == QmfQuery._TARGET_OBJECT_ID: @@ -294,7 +304,7 @@ class Agent(Thread): self._lock.acquire() try: for name in self._packages.iterkeys(): - if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:name})): + if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})): pnames.append(name) finally: self._lock.release() @@ -312,23 +322,32 @@ class Agent(Thread): logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) - def _querySchemaId( self, msg, query ): + def _querySchema( self, msg, query, _idOnly=False ): """ """ schemas = [] self._lock.acquire() try: - for schemaId in self._schema.iterkeys(): - if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:schemaId.getPackageName()})): - schemas.append(schemaId.mapEncode()) + for sid,val in self._schema.iteritems(): + if query.evaluate(val): + if _idOnly: + schemas.append(sid.map_encode()) + else: + schemas.append(val.map_encode()) finally: self._lock.release() try: tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.schema_id: schemas} + else: + content = {MsgKey.schema:schemas} + m = Message( subject=makeSubject(OpCode.data_ind), properties={"method":"response"}, - content={MsgKey.schema_id: schemas} ) + content=content ) if msg.correlation_id != None: m.correlation_id = msg.correlation_id tmp_snd.send(m) @@ -340,35 +359,45 @@ class Agent(Thread): ##============================================================================== - ## OBJECTS + ## DATA MODEL ##============================================================================== -class QmfAgentData(QmfManaged): +class QmfAgentData(QmfData): """ A managed data object that is owned by an agent. """ - def __init__(self, _agent, _schema, _props={}): - """ - @type _agent: class Agent - @param _agent: the agent that manages this object. - @type _schema: class SchemaObjectClass - @param _schema: the schema used to describe this data object - @type _props: map of "name"= pairs - @param _props: initial values for all properties in this object - """ - super(QmfAgentData, self).__init__(_agentId=_agent.getAgentId(), - _schema=_schema, - _props=_props) + + def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes, + _tag=_tag, _ctime=ctime, + _utime=ctime, _object_id=_object_id, + _schema=_schema, _const=False) + self._agent = agent def destroy(self): - self._timestamps[QmfManaged._ts_delete] = long(time.time() * 1000) + self._dtime = long(time.time() * 1000) # @todo: publish change - def setProperty( self, _name, _value): - super(QmfAgentData, self).setProperty(_name, _value) + def is_deleted(self): + return self._dtime == 0 + + def set_value(self, _name, _value, _subType=None): + super(QmfAgentData, self).set_value(_name, _value, _subType) # @todo: publish change + def inc_value(self, name, delta): + """ add the delta to the property """ + # @todo: need to take write-lock + logging.error(" TBD!!!") + + def dec_value(self, name, delta): + """ subtract the delta from the property """ + # @todo: need to take write-lock + logging.error(" TBD!!!") ################################################################################ @@ -377,22 +406,87 @@ class QmfAgentData(QmfManaged): ################################################################################ if __name__ == '__main__': - import time + # static test cases - no message passing, just exercise API + from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes, + SchemaMethod, SchemaEventClass) + logging.getLogger().setLevel(logging.INFO) - logging.info( "Starting Connection" ) - _c = Connection("localhost") - _c.connect() - #c.start() - logging.info( "Starting Agent" ) - _agent = Agent("redhat.com", "agent", "tross") - _agent.setConnection(_c) + logging.info( "Create an Agent" ) + _agent_name = AgentName("redhat.com", "agent", "tross") + _agent = Agent(str(_agent_name)) - logging.info( "Running Agent" ) - - while True: - try: - time.sleep(10) - except KeyboardInterrupt: - break - + logging.info( "Get agent name: '%s'" % _agent.get_name()) + + logging.info( "Create SchemaObjectClass" ) + + _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"]) + # add properties + _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + # These two properties can be set via the method call + _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod(_desc="Method to set string and int in object." ) + _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + print("Schema Map='%s'" % str(_schema.map_encode())) + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + logging.info( "Create QmfAgentData" ) + + _obj = QmfAgentData( _agent, _schema=_schema ) + _obj.set_value("index1", 100) + _obj.set_value("index2", "a name" ) + _obj.set_value("set_string", "UNSET") + _obj.set_value("set_int", 0) + _obj.set_value("query_count", 0) + _obj.set_value("method_call_count", 0) + + print("Obj1 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object( _obj ) + + _obj = QmfAgentData( _agent, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0}, + _schema=_schema) + + print("Obj2 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object(_obj) + + ############## + + + + logging.info( "Create SchemaEventClass" ) + + _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test data schema", + _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)}) + _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + print("Event Map='%s'" % str(_event.map_encode())) + + _agent.register_event_class(_event) diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py index 6e79b1c3f4..9bcb166c6b 100644 --- a/qpid/python/qmf/qmfCommon.py +++ b/qpid/python/qmf/qmfCommon.py @@ -34,11 +34,10 @@ except ImportError: ## Constants ## -AMQP_QMF_TOPIC = "amq.topic" -AMQP_QMF_DIRECT = "amq.direct" -AMQP_QMF_NAME_SEPARATOR = "/" -AMQP_QMF_AGENT_LOCATE = "amq.topic/agent.locate" -AMQP_QMF_AGENT_INDICATION = "amq.topic/agent.ind" + +AMQP_QMF_AGENT_LOCATE = "agent.locate" +AMQP_QMF_AGENT_INDICATION = "agent.ind" + AMQP_QMF_SUBJECT = "qmf" AMQP_QMF_VERSION = 4 @@ -49,6 +48,7 @@ class MsgKey(object): query = "query" package_info = "package_info" schema_id = "schema_id" + schema = "schema" class OpCode(object): @@ -108,26 +108,71 @@ class Notifier(object): ##============================================================================== -## Agent Identification +## Addressing ##============================================================================== -class AgentId(object): +class QmfAddress(object): """ - Uniquely identifies a management agent within the entire management domain. - - Map format: - map["vendor"] = str, name of vendor of the agent - map["product"] = str, name of product using agent - map["name"] = str, name of agent, unique within vendor and product. + TBD + """ + TYPE_DIRECT = "direct" + TYPE_TOPIC = "topic" + + ADDRESS_FMT = "qmf.%s.%s/%s" + DEFAULT_DOMAIN = "default" + + + def __init__(self, name, domain, type_): + self._name = name + self._domain = domain + self._type = type_ + + def _direct(cls, name, _domain=None): + if _domain is None: + _domain = QmfAddress.DEFAULT_DOMAIN + return cls(name, _domain, type_=QmfAddress.TYPE_DIRECT) + direct = classmethod(_direct) + + def _topic(cls, name, _domain=None): + if _domain is None: + _domain = QmfAddress.DEFAULT_DOMAIN + return cls(name, _domain, type_=QmfAddress.TYPE_TOPIC) + topic = classmethod(_topic) + + + def get_address(self): + return str(self) + + def __repr__(self): + return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._name) + + + + +class AgentName(object): + """ + Uniquely identifies a management agent within the management domain. """ _separator = ":" - def __init__(self, vendor, product, name): + + def __init__(self, vendor, product, name, str_=None): """ Note: this object must be immutable, as it is used to index into a dictionary """ - self._vendor = vendor - self._product = product - self._name = name + if str_: + # construct from string representation + if _str.count(AgentId._separator) < 2: + raise TypeError("AgentId string format must be 'vendor.product.name'") + self._vendor, self._product, self._name = param.split(AgentId._separator) + else: + self._vendor = vendor + self._product = product + self._name = name + + + def _from_str(cls, str_): + return cls(None, None, None, str_=str_) + from_str = classmethod(_from_str) def vendor(self): return self._vendor @@ -138,15 +183,8 @@ class AgentId(object): def name(self): return self._name - def mapEncode(self): - _map = {} - _map["vendor"] = self._vendor - _map["product"] = self._product - _map["name"] = self._name - return _map - def __cmp__(self, other): - if not isinstance(other, AgentId) : + if not isinstance(other, AgentName) : raise TypeError("Invalid types for compare") # return 1 me = str(self) @@ -162,40 +200,10 @@ class AgentId(object): return (self._vendor, self._product, self._name).__hash__() def __repr__(self): - return self._vendor + AgentId._separator + \ - self._product + AgentId._separator + \ + return self._vendor + AgentName._separator + \ + self._product + AgentName._separator + \ self._name - def __str__(self): - return self.__repr__() - - - -def AgentIdFactory( param ): - """ - Factory for constructing an AgentId class from various sources. - - @type param: various - @param param: object to use for constructing AgentId - @rtype: AgentId - @returns: a new AgentId instance - """ - if type(param) == str: - if param.count(AgentId._separator) < 2: - raise TypeError("AgentId string format must be 'vendor:product:name'") - vendor, product, name = param.split(AgentId._separator) - return AgentId(vendor, product, name) - if type(param) == dict: - # construct from map encoding - if not "vendor" in param: - raise TypeError("requires 'vendor' value") - if not "product" in param: - raise TypeError("requires 'product' value") - if not "name" in param: - raise TypeError("requires 'name' value") - return AgentId( param["vendor"], param["product"], param["name"] ) - raise TypeError("Invalid type for AgentId construction") - ##============================================================================== @@ -203,451 +211,293 @@ def AgentIdFactory( param ): ##============================================================================== - -class ObjectId(object): - """ - An instance of managed object must be uniquely identified within the - management system. Each managed object is given a key that is unique - within the domain of the object's managing Agent. Note that these - keys are not unique across Agents. Therefore, a globally unique name - for an instance of a managed object is the concatenation of the - object's key and the managing Agent's AgentId. - - Map format: - map["agent_id"] = map representation of AgentId - map["primary_key"] = str, key for managed object - """ - def __init__(self, agentid, name): - if not isinstance(agentid, AgentId): - raise TypeError("requires an AgentId class") - self._agentId = agentid; - self._name = name; - - def getAgentId(self): - """ - @rtype: class AgentId - @returns: Id of agent that manages the object. - """ - return self._agentId - - def getPrimaryKey(self): - """ - @rtype: str - @returns: Key of managed object. - """ - return self._name - - def mapEncode(self): - _map = {} - _map["agent_id"] = self._agentId.mapEncode() - _map["primary_key"] = self._name - return _map - - def __repr__(self): - return "%s:%s" % (self._agentId, self._name) - - def __cmp__(self, other): - if not isinstance(other, ObjectId) : - raise TypeError("Invalid types for compare") - - if self._agentId < other._agentId: - return -1 - if self._agentId > other._agentId: - return 1 - if self._name < other._name: - return -1 - if self._name > other._name: - return 1 - return 0 - - def __hash__(self): - return (hash(self._agentId), self._name).__hash__() - - -def ObjectIdFactory( param ): - """ - Factory for constructing ObjectIds from various sources - - @type param: various - @param param: object to use for constructing ObjectId - @rtype: ObjectId - @returns: a new ObjectId instance +class _mapEncoder(object): + """ + virtual base class for all objects that support being converted to a map """ - if type(param) == dict: - # construct from map - if "agent_id" not in param: - raise TypeError("requires 'agent_id' value") - if "primary_key" not in param: - raise TypeError("requires 'primary_key' value") - - return ObjectId( AgentIdFactory(param["agent_id"]), param["primary_key"] ) - - else: - raise TypeError("Invalid type for ObjectId construction") + def map_encode(self): + raise Exception("The map_encode method my be overridden.") - -class QmfData(object): +class QmfData(_mapEncoder): """ Base data class representing arbitrarily structure data. No schema or managing agent is associated with data of this class. Map format: - map["properties"] = map of unordered "name"= pairs (optional) + map["_values"] = map of unordered "name"= pairs (optional) + map["_subtype"] = map of unordered "name"="subtype string" pairs (optional) + map["_tag"] = application-specific tag for this instance (optional) """ - def __init__(self, _props={}, _const=False): - """ - @type _props: dict - @param _props: dictionary of initial name=value pairs for object's property data. + KEY_VALUES = "_values" + KEY_SUBTYPES = "_subtypes" + KEY_TAG="_tag" + KEY_OBJECT_ID = "_object_id" + KEY_SCHEMA_ID = "_schema_id" + KEY_UPDATE_TS = "_update_ts" + KEY_CREATE_TS = "_create_ts" + KEY_DELETE_TS = "_delete_ts" + + def __init__(self, + _values={}, _subtypes={}, _tag=None, _object_id=None, + _ctime = 0, _utime = 0, _dtime = 0, + _map=None, + _schema=None, _const=False): + """ + @type _values: dict + @param _values: dictionary of initial name=value pairs for object's + named data. + @type _subtypes: dict + @param _subtype: dictionary of subtype strings for each of the object's + named data. + @type _desc: string + @param _desc: Human-readable description of this data object. @type _const: boolean @param _const: if true, this object cannot be modified """ - self._properties = _props.copy() + self._schema_id = None + if _map is not None: + # construct from map + _tag = _map.get(self.KEY_TAG, _tag) + _values = _map.get(self.KEY_VALUES, _values) + _subtypes = _map.get(self.KEY_SUBTYPES, _subtypes) + _object_id = _map.get(self.KEY_OBJECT_ID, _object_id) + sid = _map.get(self.KEY_SCHEMA_ID) + if sid: + self._schema_id = SchemaClassId(_map=sid) + _ctime = long(_map.get(self.KEY_CREATE_TS, _ctime)) + _utime = long(_map.get(self.KEY_UPDATE_TS, _utime)) + _dtime = long(_map.get(self.KEY_DELETE_TS, _dtime)) + + self._values = _values.copy() + self._subtypes = _subtypes.copy() + self._tag = _tag + self._ctime = _ctime + self._utime = _utime + self._dtime = _dtime self._const = _const - self._managed = False # not managed by an Agent - self._described = False # not described by a schema - - def isManaged(self): - return self._managed - def isDescribed(self): - return self._described - - def getProperties(self): - return self._properties.copy() + if _object_id is not None: + self._object_id = str(_object_id) + else: + self._object_id = None - def getProperty(self, _name): - return self._properties[_name] + if _schema is not None: + self._set_schema(_schema) + else: + # careful: map constructor may have already set self._schema_id, do + # not override it! + self._schema = None + + def _create(cls, values, _subtypes={}, _tag=None, _object_id=None, + _schema=None, _const=False): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + return cls(_values=values, _subtypes=_subtypes, _tag=_tag, + _ctime=ctime, _utime=ctime, + _object_id=_object_id, _schema=_schema, _const=_const) + create = classmethod(_create) + + def _from_map(cls, map_, _schema=None, _const=False): + return cls(_map=map_, _schema=_schema, _const=_const) + from_map = classmethod(_from_map) + + def is_managed(self): + return self._object_id is not None + + def is_described(self): + return self._schema_id is not None + + def get_tag(self): + return self._tag + + def get_value(self, name): + # meta-properties: + if name == SchemaClassId.KEY_PACKAGE: + if self._schema_id: + return self._schema_id.get_package_name() + return None + if name == SchemaClassId.KEY_CLASS: + if self._schema_id: + return self._schema_id.get_class_name() + return None + if name == SchemaClassId.KEY_TYPE: + if self._schema_id: + return self._schema_id.get_type() + return None + if name == SchemaClassId.KEY_HASH: + if self._schema_id: + return self._schema_id.get_hash_string() + return None + if name == self.KEY_SCHEMA_ID: + return self._schema_id + if name == self.KEY_OBJECT_ID: + return self._object_id + if name == self.KEY_TAG: + return self._tag + if name == self.KEY_UPDATE_TS: + return self._utime + if name == self.KEY_CREATE_TS: + return self._ctime + if name == self.KEY_DELETE_TS: + return self._dtime + + return self._values.get(name) + + def has_value(self, name): + + if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS, + SchemaClassId.KEY_TYPE, SchemaClassId.KEY_HASH, + self.KEY_SCHEMA_ID]: + return self._schema_id is not None + if name in [self.KEY_UPDATE_TS, self.KEY_CREATE_TS, + self.KEY_DELETE_TS]: + return True + if name == self.KEY_OBJECT_ID: + return self._object_id is not None + if name == self.KEY_TAG: + return self._tag is not None - def hasProperty(self, _name): - return _name in self._properties + return name in self._values - def setProperty(self, _name, _value): + def set_value(self, _name, _value, _subType=None): if self._const: raise Exception("cannot modify constant data object") - self._properties[_name] = _value + self._values[_name] = _value + if _subType: + self._subtypes[_name] = _subType return _value - def mapEncode(self): - return self._properties.copy() - - def __repr__(self): - return str(self.mapEncode()) - - def __setattr__(self, _name, _value): - # ignore private data members - if _name[0] == '_': - return super.__setattr__(self, _name, _value) - if _name in self._properties: - return self.setProperty(_name, _value) - return super.__setattr__(self, _name, _value) - - def __getattr__(self, _name): - if _name in self._properties: return self.getProperty(_name) - raise AttributeError("no item named '%s' in this object" % _name) - - def __getitem__(self, _name): - return self.__getattr__(_name) - - def __setitem__(self, _name, _value): - return self.__setattr__(_name, _value) - - -def QmfDataFactory( param, const=False ): - """ - Factory for constructing an QmfData class from various sources. - - @type param: various - @param param: object to use for constructing QmfData instance - @rtype: QmfData - @returns: a new QmfData instance - """ - if type(param) == dict: - # construct from map - return QmfData( _props=param, _const=const ) - - else: - raise TypeError("Invalid type for QmfData construction") - + def get_subtype(self, _name): + return self._subtypes.get(_name) - -class QmfDescribed(QmfData): - """ - Data that has a formally defined structure is represented by the - QmfDescribed class. This class extends the QmfData class by - associating the data with a formal schema (SchemaObjectClass). - - Map format: - map["schema_id"] = map representation of a SchemaClassId instance - map["properties"] = map representation of a QmfData instance - """ - def __init__(self, _schema=None, _schemaId=None, _props={}, _const=False ): - """ - @type _schema: class SchemaClass or derivative - @param _schema: an instance of the schema used to describe this object. - @type _schemaId: class SchemaClassId - @param _schemaId: if schema instance not available, this is mandatory. - @type _props: dict - @param _props: dictionary of initial name=value pairs for object's property data. - @type _const: boolean - @param _const: if true, this object cannot be modified - """ - super(QmfDescribed, self).__init__(_props, _const) - self._validated = False - self._described = True - self._schema = _schema - if _schema: - self._schemaId = _schema.getClassId() - if self._const: - self._validate() - else: - if _schemaId: - self._schemaId = _schemaId - else: - raise Exception("A SchemaClass or SchemaClassId must be provided") - - - def getSchemaClassId(self): + def get_schema_class_id(self): """ @rtype: class SchemaClassId @returns: the identifier of the Schema that describes the structure of the data. """ - return self._schemaId - - def setSchema(self, _schema): - """ - @type _schema: class SchemaClass or derivative - @param _schema: instance of schema used to describe the structure of the data. - """ - if self._schemaId != _schema.getClassId(): - raise Exception("Cannot reset current schema to a different schema") - oldSchema = self._schema - self._schema = _schema - if not oldSchema and self._const: - self._validate() + return self._schema_id - def getPrimaryKey(self): + def get_object_id(self): """ - Get a string composed of the object's primary key properties. + Get the instance's identification string. @rtype: str - @returns: a string composed from primary key property values. + @returns: the identification string, or None if not assigned and id. """ + if self._object_id: + return self._object_id + + # if object id not assigned, see if schema defines a set of field + # values to use as an id if not self._schema: - raise Exception("schema not available") + return None + + ids = self._schema.get_id_names() + if not ids: + return None if not self._validated: self._validate() - if self._schema._pkeyNames == 0: - if len(self._properties) != 1: - raise Exception("no primary key defined") - return str(self._properties.values()[0]) - result = u"" - for pkey in self._schema._pkeyNames: - if result != u"": - result += u":" + for key in ids: try: - valstr = unicode(self._properties[pkey]) + result += unicode(self._values[key]) except: - valstr = u"" - result += valstr + logging.error("get_object_id(): cannot convert value '%s'." + % key) + return None + self._object_id = result return result - - def getProperty(self, name): - # meta-properties - if name == QmfQuery._PRED_PACKAGE: - return self._schemaId.getClassId().getPackageName() - if name == QmfQuery._PRED_CLASS: - return self._schemaId.getClassId().getClassName() - if name == QmfQuery._PRED_TYPE: - return self._schemaId.getClassId().getType() - if name == QmfQuery._PRED_HASH: - return self._schemaId.getClassId().getHashString() - if name == QmfQuery._PRED_SCHEMA_ID: - return self._schemaId.getClassId() - if name == QmfQuery._PRED_PRIMARY_KEY: - return self.getPrimaryKey() - - return super(QmfDescribed, self).getProperty(name) - - - def hasProperty(self, name): - if name in [QmfQuery._PRED_PACKAGE, QmfQuery._PRED_CLASS, QmfQuery._PRED_TYPE, - QmfQuery._PRED_HASH, QmfQuery._PRED_SCHEMA_ID, QmfQuery._PRED_PRIMARY_KEY]: - return True - - return super(QmfDescribed, self).hasProperty(name) - - - def mapEncode(self): + def map_encode(self): _map = {} - _map["schema_id"] = self._schemaId.mapEncode() - _map["properties"] = super(QmfDescribed, self).mapEncode() + if self._tag: + _map[self.KEY_TAG] = self._tag + + # data in the _values map may require recursive map_encode() + vmap = {} + for name,val in self._values.iteritems(): + if isinstance(val, _mapEncoder): + vmap[name] = val.map_encode() + else: + # otherwise, just toss in the native type... + vmap[name] = val + + _map[self.KEY_VALUES] = vmap + # subtypes are never complex, so safe to just copy + _map[self.KEY_SUBTYPES] = self._subtypes.copy() + if self._object_id: + _map[self.KEY_OBJECT_ID] = self._object_id + if self._schema_id: + _map[self.KEY_SCHEMA_ID] = self._schema_id.map_encode() return _map + def _set_schema(self, schema): + self._validated = False + self._schema = schema + if schema: + self._schema_id = schema.get_class_id() + if self._const: + self._validate() + else: + self._schema_id = None + def _validate(self): """ Compares this object's data against the associated schema. Throws an exception if the data does not conform to the schema. """ - for name,val in self._schema._properties.iteritems(): + props = self._schema.get_properties() + for name,val in props.iteritems(): # @todo validate: type compatible with amqp_type? # @todo validate: primary keys have values - if name not in self._properties: + if name not in self._values: if val._isOptional: # ok not to be present, put in dummy value # to simplify access - self._properties[name] = None + self._values[name] = None else: raise Exception("Required property '%s' not present." % name) self._validated = True + def __repr__(self): + return "QmfData=<<" + str(self.map_encode()) + ">>" + + def __setattr__(self, _name, _value): + # ignore private data members + if _name[0] == '_': + return super(QmfData, self).__setattr__(_name, _value) + if _name in self._values: + return self.set_value(_name, _value) + return super(QmfData, self).__setattr__(_name, _value) -def QmfDescribedFactory( param, _schema=None ): - """ - Factory for constructing an QmfDescribed class from various sources. - - @type param: various - @param param: object to use for constructing QmfDescribed instance - @type _schema: SchemaClass - @param _schema: instance of the SchemaClass that describes this instance - @rtype: QmfDescribed - @returns: a new QmfDescribed instance - """ - if type(param) == dict: - # construct from map - if "schema_id" not in param: - raise TypeError("requires 'schema_id' value") - if "properties" not in param: - raise TypeError("requires 'properties' value") - - return QmfDescribed( _schema=_schema, _schemaId=SchemaClassIdFactory( param["schema_id"] ), - _props = param["properties"] ) - else: - raise TypeError("Invalid type for QmfDescribed construction") - - - -class QmfManaged(QmfDescribed): - """ - Data that has a formally defined structure, and for which each - instance of the data is managed by a particular Agent is represented - by the QmfManaged class. This class extends the QmfDescribed class by - associating the described data with a particular Agent. - - Map format: - map["object_id"] = map representation of an ObjectId value - map["schema_id"] = map representation of a SchemaClassId instance - map["qmf_data"] = map representation of a QmfData instance - map["timestamps"] = array of AMQP timestamps. [0] = time of last update, - [1] = creation timestamp, [2] = deletion timestamp or zero. - """ - _ts_update = 0 - _ts_create = 1 - _ts_delete = 2 - def __init__( self, _agentId=None, _schema=None, _schemaId=None, - _props={}, _const=False ): - """ - @type _agentId: class AgentId - @param _agentId: globally unique identifier of the managing agent. - @type _schema: class SchemaClass or derivative - @param _schema: an instance of the schema used to describe this object. - @type _schemaId: class SchemaClassId - @param _schemaId: if schema instance not available, this is mandatory. - @type _props: dict - @param _props: dictionary of initial name=value pairs for object's property data. - @type _const: boolean - @param _const: if true, this object cannot be modified - """ - super(QmfManaged, self).__init__(_schema=_schema, _schemaId=_schemaId, - _props=_props, _const=_const) - self._managed = True - self._agentId = _agentId - # timestamp, in millisec since epoch UTC - _ctime = long(time.time() * 1000) - self._timestamps = [_ctime,_ctime,0] - - - def getObjectId(self): - """ - @rtype: class ObjectId - @returns: the ObjectId that uniquely identifies this managed object. - """ - return ObjectId(self._agentId, self.getPrimaryKey()) - - def isDeleted(self): - return self._timestamps[QmfManaged._ts_delete] == 0 - - - def getProperty(self, name): - # meta-properties - if name == QmfQuery._PRED_OBJECT_ID: - return self.getObjectId() - if name == QmfQuery._PRED_UPDATE_TS: - return self._timestamps[QmfManaged._ts_update] - if name == QmfQuery._PRED_CREATE_TS: - return self._timestamps[QmfManaged._ts_create] - if name == QmfQuery._PRED_DELETE_TS: - return self._timestamps[QmfManaged._ts_delete] - - return super(QmfManaged, self).getProperty(name) - - - def hasProperty(self, name): - if name in [QmfQuery._PRED_OBJECT_ID, QmfQuery._PRED_UPDATE_TS, - QmfQuery._PRED_CREATE_TS, QmfQuery._PRED_DELETE_TS]: - return True - - return super(QmfManaged, self).hasProperty(name) - - - def mapEncode(self): - _map = super(QmfManaged, self).mapEncode() - _map["agent_id"] = self._agentId.mapEncode() - _map["timestamps"] = self._timestamps[:] - return _map - + def __getattr__(self, _name): + if _name != "_values" and _name in self._values: + return self._values[_name] + raise AttributeError("no value named '%s' in this object" % _name) + def __getitem__(self, _name): + return self.__getattr__(_name) -def QmfManagedFactory( param, _schema=None ): - """ - Factory for constructing an QmfManaged instance from various sources. - - @type param: various - @param param: object to use for constructing QmfManaged instance - @type _schema: SchemaClass - @param _schema: instance of the SchemaClass that describes this instance - @rtype: QmfManaged - @returns: a new QmfManaged instance - """ - if type(param) == dict: - # construct from map - if "agent_id" not in param: - raise TypeError("requires 'agent_id' value") - _qd = QmfDescribedFactory( param, _schema ) - - return QmfManaged( _agentId = AgentIdFactory(param["agent_id"]), - _schema=_schema, _schemaId=_qd._schemaId, - _props = _qd._properties ) - else: - raise TypeError("Invalid type for QmfManaged construction") + def __setitem__(self, _name, _value): + return self.__setattr__(_name, _value) -class QmfEvent(QmfDescribed): +class QmfEvent(QmfData): """ A QMF Event is a type of described data that is not managed. Events are notifications that are sent by Agents. An event notifies a Console of a change in some aspect of the system under managment. """ - def __init__(self, _map=None, - _timestamp=None, _agentId=None, - _schema=None, _schemaId=None, - _props={}, _const=False): + KEY_TIMESTAMP = "_timestamp" + + def __init__(self, _timestamp=None, _values={}, _subtypes={}, _tag=None, + _map=None, + _schema=None, _const=True): """ @type _map: dict @param _map: if not None, construct instance from map representation. @@ -661,240 +511,52 @@ class QmfEvent(QmfDescribed): @type _schemaId: class SchemaClassId (event) @param _schemaId: identi """ - if _map: - if type(_map) != dict: - raise TypeError("parameter '_map' must be of type 'dict'") - if "timestamp" not in _map: - pass - if "agent_id" not in _map: - pass - _qe = QmfDescribedFactory( _map, _schema ) - super(QmfEvent, self).__init__( _schema=_qe._schema, _schemaId=_qe._schemaId, - _props=_qe._properties, _const=_qe._const ) - self._timestamp = long(_map["timestamp"]) - self._agentId = AgentIdFactory(_map["agent_id"]) + + if _map is not None: + # construct from map + super(QmfEvent, self).__init__(_map=_map, _schema=_schema, + _const=_const) + _timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp) else: - super(QmfEvent, self).__init__(_schema=_schema, _schemaId=_schemaId, - _props=_props, _const=_const) + super(QmfEvent, self).__init__(_values=_values, + _subtypes=_subtypes, _tag=_tag, + _schema=_schema, _const=_const) + if _timestamp is None: + raise TypeError("QmfEvent: a valid timestamp is required.") + + try: self._timestamp = long(_timestamp) - self._agentId = _agentId + except: + raise TypeError("QmfEvent: a numeric timestamp is required.") + + def _create(cls, timestamp, values, + _subtypes={}, _tag=None, _schema=None, _const=False): + return cls(_timestamp=timestamp, _values=values, _subtypes=_subtypes, + _tag=_tag, _schema=_schema, _const=_const) + create = classmethod(_create) - def getTimestamp(self): return self._timestamp + def _from_map(cls, map_, _schema=None, _const=False): + return cls(_map=map_, _schema=_schema, _const=_const) + from_map = classmethod(_from_map) - def getAgentId(self): return self._agentId + def get_timestamp(self): + return self._timestamp - def mapEncode(self): - _map = super(QmfEvent, self).mapEncode() - _map["timestamp"] = self._timestamp - _map["agent_id"] = self._agentId.mapEncode() + def map_encode(self): + _map = super(QmfEvent, self).map_encode() + _map[self.KEY_TIMESTAMP] = self._timestamp return _map + + #============================================================================== #============================================================================== #============================================================================== -class QmfObject(object): - # attr_reader :impl, :object_class - def __init__(self, cls, kwargs={}): - pass -# self._cv = Condition() -# self._sync_count = 0 -# self._sync_result = None -# self._allow_sets = False -# if kwargs.has_key("broker"): -# self._broker = kwargs["broker"] -# else: -# self._broker = None -# if cls: -# self.object_class = cls -# self.impl = qmfengine.Object(self.object_class.impl) -# elif kwargs.has_key("impl"): -# self.impl = qmfengine.Object(kwargs["impl"]) -# self.object_class = SchemaObjectClass(None, -# None, -# {"impl":self.impl.getClass()}) -# else: -# raise Exception("Argument error: required parameter ('impl') not supplied") - - -# def destroy(self): -# self.impl.destroy() - - -# def object_id(self): -# return ObjectId(self.impl.getObjectId()) - - -# def set_object_id(self, oid): -# self.impl.setObjectId(oid.impl) - - -# def properties(self): -# list = [] -# for prop in self.object_class.properties: -# list.append([prop, self.get_attr(prop.name())]) -# return list - - -# def statistics(self): -# list = [] -# for stat in self.object_class.statistics: -# list.append([stat, self.get_attr(stat.name())]) -# return list - - -# def get_attr(self, name): -# val = self._value(name) -# vType = val.getType() -# if vType == TYPE_UINT8: return val.asUint() -# elif vType == TYPE_UINT16: return val.asUint() -# elif vType == TYPE_UINT32: return val.asUint() -# elif vType == TYPE_UINT64: return val.asUint64() -# elif vType == TYPE_SSTR: return val.asString() -# elif vType == TYPE_LSTR: return val.asString() -# elif vType == TYPE_ABSTIME: return val.asInt64() -# elif vType == TYPE_DELTATIME: return val.asUint64() -# elif vType == TYPE_REF: return ObjectId(val.asObjectId()) -# elif vType == TYPE_BOOL: return val.asBool() -# elif vType == TYPE_FLOAT: return val.asFloat() -# elif vType == TYPE_DOUBLE: return val.asDouble() -# elif vType == TYPE_UUID: return val.asUuid() -# elif vType == TYPE_INT8: return val.asInt() -# elif vType == TYPE_INT16: return val.asInt() -# elif vType == TYPE_INT32: return val.asInt() -# elif vType == TYPE_INT64: return val.asInt64() -# else: -# # when TYPE_MAP -# # when TYPE_OBJECT -# # when TYPE_LIST -# # when TYPE_ARRAY -# logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) ) -# return None - - -# def set_attr(self, name, v): -# val = self._value(name) -# vType = val.getType() -# if vType == TYPE_UINT8: return val.setUint(v) -# elif vType == TYPE_UINT16: return val.setUint(v) -# elif vType == TYPE_UINT32: return val.setUint(v) -# elif vType == TYPE_UINT64: return val.setUint64(v) -# elif vType == TYPE_SSTR: -# if v: return val.setString(v) -# else: return val.setString('') -# elif vType == TYPE_LSTR: -# if v: return val.setString(v) -# else: return val.setString('') -# elif vType == TYPE_ABSTIME: return val.setInt64(v) -# elif vType == TYPE_DELTATIME: return val.setUint64(v) -# elif vType == TYPE_REF: return val.setObjectId(v.impl) -# elif vType == TYPE_BOOL: return val.setBool(v) -# elif vType == TYPE_FLOAT: return val.setFloat(v) -# elif vType == TYPE_DOUBLE: return val.setDouble(v) -# elif vType == TYPE_UUID: return val.setUuid(v) -# elif vType == TYPE_INT8: return val.setInt(v) -# elif vType == TYPE_INT16: return val.setInt(v) -# elif vType == TYPE_INT32: return val.setInt(v) -# elif vType == TYPE_INT64: return val.setInt64(v) -# else: -# # when TYPE_MAP -# # when TYPE_OBJECT -# # when TYPE_LIST -# # when TYPE_ARRAY -# logging.error("Unsupported type for get_attr? '%s'" % str(val.getType())) -# return None - - -# def __getitem__(self, name): -# return self.get_attr(name) - - -# def __setitem__(self, name, value): -# self.set_attr(name, value) - - -# def inc_attr(self, name, by=1): -# self.set_attr(name, self.get_attr(name) + by) - - -# def dec_attr(self, name, by=1): -# self.set_attr(name, self.get_attr(name) - by) - - - - -# def _invokeMethod(self, name, argMap): -# """ -# Private: Helper function that invokes an object's method, and waits for the result. -# """ -# self._cv.acquire() -# try: -# timeout = 30 -# self._sync_count = 1 -# self.impl.invokeMethod(name, argMap, self) -# if self._broker: -# self._broker.conn.kick() -# self._cv.wait(timeout) -# if self._sync_count == 1: -# raise Exception("Timed out: waiting for response to method call.") -# finally: -# self._cv.release() - -# return self._sync_result - - -# def _method_result(self, result): -# """ -# Called to return the result of a method call on an object -# """ -# self._cv.acquire(); -# try: -# self._sync_result = result -# self._sync_count -= 1 -# self._cv.notify() -# finally: -# self._cv.release() - - -# def _marshall(schema, args): -# ''' -# Private: Convert a list of arguments (positional) into a Value object of type "map". -# Used to create the argument parameter for an object's method invokation. -# ''' -# # Build a map of the method's arguments -# map = qmfengine.Value(TYPE_MAP) -# for arg in schema.arguments: -# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: -# map.insert(arg.name, qmfengine.Value(arg.typecode)) - -# # install each argument's value into the map -# marshalled = Arguments(map) -# idx = 0 -# for arg in schema.arguments: -# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: -# if args[idx]: -# marshalled[arg.name] = args[idx] -# idx += 1 - -# return marshalled.map - - -# def _value(self, name): -# val = self.impl.getValue(name) -# if not val: -# raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % -# (name, -# self.object_class.impl.getClassKey().getPackageName(), -# self.object_class.impl.getClassKey().getClassName())) -# return val - - - - class Arguments(object): def __init__(self, map): @@ -1092,7 +754,7 @@ class MethodResponse(object): -class QmfQuery(object): +class QmfQuery(_mapEncoder): _TARGET="what" _PREDICATE="where" @@ -1223,18 +885,19 @@ class QmfQuery(object): def getPredicate(self): return self._predicate - def mapEncode(self): + def map_encode(self): _map = {} _map[self._TARGET] = self._target_map - _map[self._PREDICATE] = self._predicate.mapEncode() + if self._predicate is not None: + _map[self._PREDICATE] = self._predicate.map_encode() return _map def __repr__(self): - return str(self.mapEncode()) + return "QmfQuery=<<" + str(self.map_encode()) + ">>" -class QmfQueryPredicate(object): +class QmfQueryPredicate(_mapEncoder): """ Class for Query predicates. """ @@ -1255,7 +918,6 @@ class QmfQueryPredicate(object): logic_op = False if type(pmap) == dict: for key in pmap.iterkeys(): - logging.error("key = %s" % key) if key in self._valid_cmp_ops: # coparison operation - may have "name" and "value" self._oper = key @@ -1283,7 +945,6 @@ class QmfQueryPredicate(object): """ Append another operand to a predicate expression """ - logging.error("Appending: '%s'" % str(operand)) self._operands.append(operand) @@ -1311,15 +972,16 @@ class QmfQueryPredicate(object): # @todo: support regular expression match name = self._operands[0] logging.debug("looking for: '%s'" % str(name)) - if not qmfData.hasProperty(name): - logging.warning("Malformed query, attribute '%s' not present." % name) + if not qmfData.has_value(name): + logging.warning("Malformed query, attribute '%s' not present." + % name) return False - arg1 = qmfData.getProperty(name) - arg1Type = type(arg1) + + arg1 = qmfData.get_value(name) + arg2 = self._operands[1] logging.debug("query evaluate %s: '%s' '%s' '%s'" % - (name, str(arg1), self._oper, str(self._operands[1]))) + (name, str(arg1), self._oper, str(arg2))) try: - arg2 = arg1Type(self._operands[1]) if self._oper == QmfQuery._CMP_EQ: return arg1 == arg2 if self._oper == QmfQuery._CMP_NE: return arg1 != arg2 if self._oper == QmfQuery._CMP_LT: return arg1 < arg2 @@ -1342,7 +1004,7 @@ class QmfQueryPredicate(object): return False name = self._operands[0] logging.debug("query evaluate PRESENT: [%s]" % str(name)) - return qmfData.hasProperty(name) + return qmfData.has_value(name) if self._oper == QmfQuery._LOGIC_AND: logging.debug("query evaluate AND: '%s'" % str(self._operands)) @@ -1369,12 +1031,12 @@ class QmfQueryPredicate(object): return False - def mapEncode(self): + def map_encode(self): _map = {} _list = [] for exp in self._operands: if isinstance(exp, QmfQueryPredicate): - _list.append(exp.mapEncode()) + _list.append(exp.map_encode()) else: _list.append(exp) _map[self._oper] = _list @@ -1382,7 +1044,7 @@ class QmfQueryPredicate(object): def __repr__(self): - return str(self.mapEncode()) + return "QmfQueryPredicate=<<" + str(self.map_encode()) + ">>" @@ -1390,15 +1052,6 @@ class QmfQueryPredicate(object): ## SCHEMA ##============================================================================== -# known schema types - -SchemaTypeData = "data" -SchemaTypeEvent = "event" - -# format convention for schema hash - -_schemaHashStrFormat = "%08x-%08x-%08x-%08x" -_schemaHashStrDefault = "00000000-00000000-00000000-00000000" # Argument typecodes, access, and direction qualifiers @@ -1458,7 +1111,7 @@ def _toBool( param ): -class SchemaClassId(object): +class SchemaClassId(_mapEncoder): """ Unique identifier for an instance of a SchemaClass. @@ -1470,20 +1123,40 @@ class SchemaClassId(object): map["hash_str"] = str, hash value in standard format or None if hash is unknown. """ - def __init__(self, pname, cname, stype=SchemaTypeData, hstr=None): + KEY_PACKAGE="_package_name" + KEY_CLASS="_class_name" + KEY_TYPE="_type" + KEY_HASH="_hash_str" + + TYPE_DATA = "_data" + TYPE_EVENT = "event" + + _valid_types=[TYPE_DATA, TYPE_EVENT] + _schemaHashStrFormat = "%08x-%08x-%08x-%08x" + _schemaHashStrDefault = "00000000-00000000-00000000-00000000" + + def __init__(self, pname=None, cname=None, stype=TYPE_DATA, hstr=None, + _map=None): """ @type pname: str @param pname: the name of the class's package @type cname: str @param cname: name of the class @type stype: str - @param stype: schema type [SchemaTypeData | SchemaTypeEvent] + @param stype: schema type [data | event] @type hstr: str @param hstr: the hash value in '%08x-%08x-%08x-%08x' format """ + if _map is not None: + # construct from map + pname = _map.get(self.KEY_PACKAGE, pname) + cname = _map.get(self.KEY_CLASS, cname) + stype = _map.get(self.KEY_TYPE, stype) + hstr = _map.get(self.KEY_HASH, hstr) + self._pname = pname self._cname = cname - if stype != SchemaTypeData and stype != SchemaTypeEvent: + if stype not in SchemaClassId._valid_types: raise TypeError("Invalid SchemaClassId type: '%s'" % stype) self._type = stype self._hstr = hstr @@ -1498,8 +1171,17 @@ class SchemaClassId(object): except: raise Exception("Invalid SchemaClassId format: bad hash string: '%s':" % hstr) + # constructor + def _create(cls, pname, cname, stype=TYPE_DATA, hstr=None): + return cls(pname=pname, cname=cname, stype=stype, hstr=hstr) + create = classmethod(_create) - def getPackageName(self): + # map constructor + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) + + def get_package_name(self): """ Access the package name in the SchemaClassId. @@ -1508,7 +1190,7 @@ class SchemaClassId(object): return self._pname - def getClassName(self): + def get_class_name(self): """ Access the class name in the SchemaClassId @@ -1517,7 +1199,7 @@ class SchemaClassId(object): return self._cname - def getHashString(self): + def get_hash_string(self): """ Access the schema's hash as a string value @@ -1526,7 +1208,7 @@ class SchemaClassId(object): return self._hstr - def getType(self): + def get_type(self): """ Returns the type code associated with this Schema @@ -1534,27 +1216,25 @@ class SchemaClassId(object): """ return self._type - def mapEncode(self): + def map_encode(self): _map = {} - _map["package_name"] = self._pname - _map["class_name"] = self._cname - _map["type"] = self._type - if self._hstr: _map["hash_str"] = self._hstr + _map[self.KEY_PACKAGE] = self._pname + _map[self.KEY_CLASS] = self._cname + _map[self.KEY_TYPE] = self._type + if self._hstr: _map[self.KEY_HASH] = self._hstr return _map def __repr__(self): - if self._type == SchemaTypeEvent: - stype = "event" - else: - stype = "data" - hstr = self.getHashString() + hstr = self.get_hash_string() if not hstr: - hstr = _schemaHashStrDefault - return self._pname + ":" + self._cname + ":" + stype + "(" + hstr + ")" + hstr = SchemaClassId._schemaHashStrDefault + return self._pname + ":" + self._cname + ":" + self._type + "(" + hstr + ")" def __cmp__(self, other): - if not isinstance(other, SchemaClassId) : + if isinstance(other, dict): + other = SchemaClassId.from_map(other) + if not isinstance(other, SchemaClassId): raise TypeError("Invalid types for compare") # return 1 me = str(self) @@ -1571,55 +1251,7 @@ class SchemaClassId(object): -def SchemaClassIdFactory( param ): - """ - Factory for constructing SchemaClassIds from various sources - - @type param: various - @param param: object to use for constructing SchemaClassId - @rtype: SchemaClassId - @returns: a new SchemaClassId instance - """ - if type(param) == str: - # construct from __repr__/__str__ representation - try: - pname, cname, rest = param.split(":") - stype,rest = rest.split("("); - hstr = rest.rstrip(")"); - if hstr == _schemaHashStrDefault: - hstr = None - return SchemaClassId( pname, cname, stype, hstr ) - except: - raise TypeError("Invalid string format: '%s'" % param) - - if type(param) == dict: - # construct from map representation - if "package_name" in param: - pname = param["package_name"] - else: - raise TypeError("'package_name' attribute is required") - - if "class_name" in param: - cname = param["class_name"] - else: - raise TypeError("'class_name' attribute is required") - - hstr = None - if "hash_str" in param: - hstr = param["hash_str"] - - stype = "data" - if "type" in param: - stype = param["type"] - - return SchemaClassId( pname, cname, stype, hstr ) - - else: - raise TypeError("Invalid type for SchemaClassId construction") - - - -class SchemaProperty(object): +class SchemaProperty(_mapEncoder): """ Describes the structure of a Property data object. Map format: @@ -1641,8 +1273,17 @@ class SchemaProperty(object): """ __hash__ = None _access_strings = ["RO","RW","RC"] - def __init__(self, typeCode, kwargs={}): - self._type = typeCode + _dir_strings = ["I", "O", "IO"] + def __init__(self, _type_code=None, _map=None, kwargs={}): + if _map is not None: + # construct from map + _type_code = _map.get("amqp_type", _type_code) + kwargs = _map + if not _type_code: + raise TypeError("SchemaProperty: amqp_type is a mandatory" + " parameter") + + self._type = _type_code self._access = "RO" self._isIndex = False self._isOptional = False @@ -1653,6 +1294,8 @@ class SchemaProperty(object): self._desc = None self._reference = None self._isParentRef = False + self._dir = None + self._default = None for key, value in kwargs.items(): if key == "access": @@ -1669,6 +1312,22 @@ class SchemaProperty(object): elif key == "desc" : self._desc = value elif key == "reference" : self._reference = value elif key == "parent_ref" : self._isParentRef = _toBool(value) + elif key == "dir": + value = str(value).upper() + if value not in self._dir_strings: + raise TypeError("invalid value for direction parameter: '%s'" % value) + self._dir = value + elif key == "default" : self._default = value + + # constructor + def _create(cls, type_code, kwargs={}): + return cls(_type_code=type_code, kwargs=kwargs) + create = classmethod(_create) + + # map constructor + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) def getType(self): return self._type @@ -1692,7 +1351,11 @@ class SchemaProperty(object): def isParentRef(self): return self._isParentRef - def mapEncode(self): + def getDirection(self): return self._dir + + def getDefault(self): return self._default + + def map_encode(self): """ Return the map encoding of this schema. """ @@ -1708,9 +1371,12 @@ class SchemaProperty(object): if self._desc: _map["desc"] = self._desc if self._reference: _map["reference"] = self._reference _map["parent_ref"] = self._isParentRef + if self._dir: _map["dir"] = self._dir + if self._default: _map["default"] = self._default return _map - def __repr__(self): return str(self.mapEncode()) + def __repr__(self): + return "SchemaProperty=<<" + str(self.map_encode()) + ">>" def _updateHash(self, hasher): """ @@ -1722,122 +1388,21 @@ class SchemaProperty(object): if self._access: hasher.update(self._access) if self._unit: hasher.update(self._unit) if self._desc: hasher.update(self._desc) + if self._dir: hasher.update(self._dir) + if self._default: hasher.update(self._default) -def SchemaPropertyFactory( _map ): - """ - Factory for constructing SchemaProperty from a map - - @type _map: dict - @param _map: from mapEncode() of SchemaProperty - @rtype: SchemaProperty - @returns: a new SchemaProperty instance - """ - if type(_map) == dict: - # construct from map - if "amqp_type" not in _map: - raise TypeError("requires 'amqp_type' value") - - return SchemaProperty( _map["amqp_type"], _map ) - - else: - raise TypeError("Invalid type for SchemaProperty construction") - - - - -class SchemaArgument(object): - """ - Describes the structure of an Argument to a Method call. - Map format: - map["amqp_type"] = int, type code indicating argument's data type - map["dir"] = str, direction for an argument associated with a - Method, "I"|"O"|"IO", default value: "I" - optional: - map["desc"] = str, human-readable description of this argument - map["default"] = by amqp_type, default value to use if none provided - """ - __hash__ = None - _dir_strings = ["I", "O", "IO"] - def __init__(self, typeCode, kwargs={}): - self._type = typeCode - self._dir = "I" - self._desc = None - self._default = None - - for key, value in kwargs.items(): - if key == "dir": - value = str(value).upper() - if value not in self._dir_strings: - raise TypeError("invalid value for dir parameter: '%s'" % value) - self._dir = value - elif key == "desc" : self._desc = value - elif key == "default" : self._default = value - - def getType(self): return self._type - - def getDirection(self): return self._dir - - def getDesc(self): return self._desc - - def getDefault(self): return self._default - - def mapEncode(self): - """ - Return the map encoding of this schema. - """ - _map = {} - _map["amqp_type"] = self._type - _map["dir"] = self._dir - # optional: - if self._default: _map["default"] = self._default - if self._desc: _map["desc"] = self._desc - return _map - - def __repr__(self): return str(self.mapEncode()) - - def _updateHash(self, hasher): - """ - Update the given hash object with a hash computed over this schema. - """ - hasher.update(str(self._type)) - hasher.update(self._dir) - if self._desc: hasher.update(self._desc) - - - -def SchemaArgumentFactory( param ): - """ - Factory for constructing SchemaArguments from various sources - - @type param: various - @param param: object to use for constructing SchemaArgument - @rtype: SchemaArgument - @returns: a new SchemaArgument instance - """ - if type(param) == dict: - # construct from map - if not "amqp_type" in param: - raise TypeError("requires 'amqp_type' value") - - return SchemaArgument( param["amqp_type"], param ) - - else: - raise TypeError("Invalid type for SchemaArgument construction") - - - -class SchemaMethod(object): +class SchemaMethod(_mapEncoder): """ The SchemaMethod class describes the method's structure, and contains a SchemaProperty class for each argument declared by the method. Map format: - map["arguments"] = map of "name"= pairs. + map["arguments"] = map of "name"= pairs. map["desc"] = str, description of the method """ - def __init__(self, args={}, _desc=None): + def __init__(self, args={}, _desc=None, _map=None): """ Construct a SchemaMethod. @@ -1846,9 +1411,22 @@ class SchemaMethod(object): @type _desc: str @param _desc: Human-readable description of the schema """ + if _map is not None: + _desc = _map.get("desc") + margs = _map.get("arguments", args) + # margs are in map format - covert to SchemaProperty + args = {} + for name,val in margs.iteritems(): + args[name] = SchemaProperty.from_map(val) + self._arguments = args.copy() self._desc = _desc + # map constructor + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) + def getDesc(self): return self._desc def getArgCount(self): return len(self._arguments) @@ -1857,7 +1435,7 @@ class SchemaMethod(object): def getArgument(self, name): return self._arguments[name] - def addArgument(self, name, schema): + def add_argument(self, name, schema): """ Add an argument to the list of arguments passed to this method. Used by an agent for dynamically creating method schema. @@ -1871,29 +1449,28 @@ class SchemaMethod(object): raise TypeError("argument must be a SchemaProperty class") self._arguments[name] = schema - def mapEncode(self): + def map_encode(self): """ Return the map encoding of this schema. """ _map = {} _args = {} for name,val in self._arguments.iteritems(): - _args[name] = val.mapEncode() + _args[name] = val.map_encode() _map["arguments"] = _args if self._desc: _map["desc"] = self._desc return _map def __repr__(self): - result = "(" + result = "SchemaMethod=< objects @param _methods: all methods provided by this schema """ - super(SchemaObjectClass, self).__init__(pname, - cname, - SchemaTypeData, - desc, - _hash) - self._properties = _props.copy() - self._pkeyNames = _pkey[:] - self._methods = _methods.copy() - - def getPrimaryKeyList(self): return self._pkeyNames[:] - - def getMethodCount(self): return len(self._methods) - def getMethods(self): return self._methods.copy() - def getMethod(self, name): return self._methods[name] - def addMethod(self, name, method): - self._methods[name] = method - self._classId = None - - - -def SchemaObjectClassFactory( param ): - """ - Factory for constructing a SchemaObjectClass from various sources. + if _map is not None: + super(SchemaObjectClass,self).__init__(_map=_map) + else: + super(SchemaObjectClass, self).__init__(_classId=_classId, _desc=_desc) + self._object_id_names = _object_id_names + for name,value in _props.iteritems(): + self.set_value(name, value, self.SUBTYPE_PROPERTY) + for name,value in _methods.iteritems(): + self.set_value(name, value, self.SUBTYPE_METHOD) + + if self._classId.get_type() != SchemaClassId.TYPE_DATA: + raise TypeError("Invalid ClassId type for data schema: %s" % self._classId) + + def get_id_names(self): + return self._object_id_names[:] + + def get_method_count(self): + count = 0 + for value in self._subtypes.itervalues(): + if value == self.SUBTYPE_METHOD: + count += 1 + return count + + def get_methods(self): + meths = {} + for name,value in self._subtypes.iteritems(): + if value == self.SUBTYPE_METHOD: + meths[name] = self._values.get(name) + return meths + + def get_method(self, name): + if self._subtypes.get(name) == self.SUBTYPE_METHOD: + return self._values.get(name) + return None - @type param: various - @param param: object to use for constructing a SchemaObjectClass instance - @rtype: SchemaObjectClass - @returns: a new SchemaObjectClass instance - """ - if type(param) == dict: - classId = None - properties = {} - methods = {} - pkey = [] - if "schema_id" in param: - classId = SchemaClassIdFactory(param["schema_id"]) - if (not classId) or (classId.getType() != SchemaTypeData): - raise TypeError("Invalid SchemaClassId specified: %s" % classId) - if "desc" in param: - desc = param["desc"] - if "primary_key" in param: - pkey = param["primary_key"] - if "properties" in param: - for name,val in param["properties"].iteritems(): - properties[name] = SchemaPropertyFactory(val) - if "methods" in param: - for name,val in param["methods"].iteritems(): - methods[name] = SchemaMethodFactory(val) - - return SchemaObjectClass( classId.getPackageName(), - classId.getClassName(), - desc, - _hash = classId.getHashString(), - _props = properties, _pkey = pkey, - _methods = methods) + def add_method(self, name, method): + self.set_value(name, method, self.SUBTYPE_METHOD) + # need to re-generate schema hash + self._classId._hstr = None - else: - raise TypeError("Invalid type for SchemaObjectClass construction") @@ -2162,46 +1711,21 @@ class SchemaEventClass(SchemaClass): map["desc"] = string description of this schema map["properties"] = map of "name":SchemaProperty values. """ - def __init__( self, pname, cname, desc=None, _props={}, _hash=None ): - super(SchemaEventClass, self).__init__(pname, - cname, - SchemaTypeEvent, - desc, - _hash ) - self._properties = _props.copy() - + def __init__(self, _classId=None, _desc=None, _props={}, + _map=None): + if _map is not None: + super(SchemaEventClass,self).__init__(_map=_map) + else: + super(SchemaEventClass, self).__init__(_classId=_classId, + _desc=_desc) + for name,value in _props.iteritems(): + self.set_value(name, value, self.SUBTYPE_PROPERTY) + if self._classId.get_type() != SchemaClassId.TYPE_EVENT: + raise TypeError("Invalid ClassId type for event schema: %s" % + self._classId) -def SchemaEventClassFactory( param ): - """ - Factory for constructing a SchemaEventClass from various sources. - @type param: various - @param param: object to use for constructing a SchemaEventClass instance - @rtype: SchemaEventClass - @returns: a new SchemaEventClass instance - """ - if type(param) == dict: - logging.debug( "constructing SchemaEventClass from map '%s'" % param ) - classId = None - properties = {} - if "schema_id" in param: - classId = SchemaClassIdFactory(param["schema_id"]) - if (not classId) or (classId.getType() != SchemaTypeEvent): - raise TypeError("Invalid SchemaClassId specified: %s" % classId) - if "desc" in param: - desc = param["desc"] - if "properties" in param: - for name,val in param["properties"].iteritems(): - properties[name] = SchemaPropertyFactory(val) - - return SchemaEventClass( classId.getPackageName(), - classId.getClassName(), - desc, - _hash = classId.getHashString(), - _props = properties ) - else: - raise TypeError("Invalid type for SchemaEventClass construction") diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py index cc8c284579..027ce163e5 100644 --- a/qpid/python/qmf/qmfConsole.py +++ b/qpid/python/qmf/qmfConsole.py @@ -30,10 +30,9 @@ from threading import Condition from qpid.messaging import * -from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION, - AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode, - QmfQuery, AgentIdFactory, Notifier, QmfQueryPredicate, MsgKey, - QmfData) +from qmfCommon import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, + QmfQueryPredicate, MsgKey, QmfData, QmfAddress, + AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION) @@ -186,69 +185,106 @@ class SequencedWaiter(object): return False +##============================================================================== +## DATA MODEL +##============================================================================== + -#class ObjectProxy(QmfObject): -class ObjectProxy(object): +class QmfConsoleData(QmfData): """ - A local representation of a QmfObject that is managed by a remote agent. + Console's representation of an managed QmfData instance. """ - def __init__(self, agent, cls, kwargs={}): + def __init__(self, map_, agent, _schema=None): + super(QmfConsoleData, self).__init__(_map=map_, + _schema=_schema, + _const=True) + self._agent = agent + + def get_timestamps(self): """ - @type agent: qmfConsole.AgentProxy - @param agent: Agent that manages this object. - @type cls: qmfCommon.SchemaObjectClass - @param cls: Schema that describes the class. - @type kwargs: dict - @param kwargs: ??? supported keys ??? + Returns a list of timestamps describing the lifecycle of + the object. All timestamps are represented by the AMQP + timestamp type. [0] = time of last update from Agent, + [1] = creation timestamp + [2] = deletion timestamp, or zero if not + deleted. """ - # QmfObject.__init__(self, cls, kwargs) - self._agent = agent + return [self._utime, self._ctime, self._dtime] + + def get_create_time(self): + """ + returns the creation timestamp + """ + return self._ctime - # def update(self): - def refresh(self, timeout = None): + def get_update_time(self): + """ + returns the update timestamp """ - Called to re-fetch the current state of the object from the agent. This updates - the contents of the object to their most current values. + return self._utime - @rtype: bool - @return: True if refresh succeeded. Refresh may fail if agent does not respond. + def get_delete_time(self): + """ + returns the deletion timestamp, or zero if not yet deleted. """ - if not self._agent: - raise Exception("No Agent associated with this object") - newer = self._agent.get_object(QmfQuery({"object_id":None}), timeout) - if newer == None: - logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) - raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) - #self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class + return self._dtime - ### def _merge_update(self, newerObject): - ### ??? in Rafi's console.py::Object Class + def is_deleted(self): + """ + True if deletion timestamp not zero. + """ + return self._dtime != long(0) + def refresh(self, _reply_handle=None, _timeout=None): + """ + request that the Agent update the value of this object's + contents. + """ + logging.error(" TBD!!!") + return None - ### def is_deleted(self): - ### ??? in Rafi's console.py::Object Class + def invoke_method(self, name, _in_args=None, _reply_handle=None, + _timeout=None): + """ + invoke the named method. + """ + logging.error(" TBD!!!") + return None - def key(self): pass +class QmfLocalData(QmfData): + """ + Console's representation of an unmanaged QmfData instance. There + is no remote agent associated with this instance. The Console has + full control over this instance. + """ + def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfLocalData, self).__init__(_values=values, + _subtypes=_subtypes, _tag=_tag, + _object_id=_object_id, + _schema=_schema, _ctime=ctime, + _utime=ctime, _const=False) class Agent(object): """ A local representation of a remote agent managed by this console. """ - def __init__(self, agent_id, console): + def __init__(self, name, console): """ @type name: AgentId @param name: uniquely identifies this agent in the AMQP domain. """ - if not isinstance(agent_id, AgentId): - raise TypeError("parameter must be an instance of class AgentId") + if not isinstance(console, Console): raise TypeError("parameter must be an instance of class Console") - self._id = agent_id - self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id) + self._name = name + self._address = QmfAddress.direct(name, console._domain) self._console = console self._sender = None self._packages = {} # map of {package-name:[list of class-names], } for this agent @@ -257,8 +293,8 @@ class Agent(object): logging.debug( "Created Agent with address: [%s]" % self._address ) - def getAgentId(self): - return self._id + def get_name(self): + return self._name def isActive(self): return self._announce_timestamp != None @@ -267,7 +303,7 @@ class Agent(object): """ Low-level routine to asynchronously send a message to this agent. """ - msg.reply_to = self._console.address() + msg.reply_to = str(self._console._address) # handle = self._console._req_correlation.allocate() # if handle == 0: # raise Exception("Can not allocate a correlation id!") @@ -329,7 +365,7 @@ class Agent(object): pass def __repr__(self): - return self._address + return str(self._address) def __str__(self): return self.__repr__() @@ -339,7 +375,7 @@ class Agent(object): """ msg = Message(subject=makeSubject(OpCode.get_query), properties={"method":"request"}, - content={MsgKey.query: query.mapEncode()}) + content={MsgKey.query: query.map_encode()}) self._sendMsg( msg, correlation_id ) @@ -389,7 +425,7 @@ class Console(Thread): """ A Console manages communications to a collection of agents on behalf of an application. """ - def __init__(self, name=None, notifier=None, + def __init__(self, name=None, _domain=None, notifier=None, reply_timeout = 60, # agent_timeout = 120, agent_timeout = 60, @@ -403,10 +439,12 @@ class Console(Thread): @param kwargs: ??? Unused """ Thread.__init__(self) - self._name = name - if not self._name: + if not name: self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) - self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name + else: + self._name = str(name) + self._domain = _domain + self._address = QmfAddress.direct(self._name, self._domain) self._notifier = notifier self._lock = Lock() self._conn = None @@ -467,10 +505,24 @@ class Console(Thread): raise Exception( "Multiple connections per Console not supported." ); self._conn = conn self._session = conn.session(name=self._name) - self._direct_recvr = self._session.receiver(self._address, capacity=1) - self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION, + self._direct_recvr = self._session.receiver(str(self._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}", + capacity=1) + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + logging.debug("agent.ind addr=%s" % ind_addr) + self._announce_recvr = self._session.receiver(str(ind_addr) + + ";{create:always," + " node-properties:{type:topic}}", capacity=1) - self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE) + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + logging.debug("agent.locate addr=%s" % locate_addr) + self._locate_sender = self._session.sender(str(locate_addr) + + ";{create:always," + " node-properties:{type:topic}}") # # Now that receivers are created, fire off the receive thread... # @@ -495,7 +547,7 @@ class Console(Thread): if self.isAlive(): # kick my thread to wake it up logging.debug("Making temp sender for [%s]" % self._address) - tmp_sender = self._session.sender(self._address) + tmp_sender = self._session.sender(str(self._address)) try: msg = Message(subject=makeSubject(OpCode.noop)) tmp_sender.send( msg, sync=True ) @@ -535,21 +587,17 @@ class Console(Thread): finally: self._lock.release() - - - - def findAgent(self, agent_id, timeout=None ): + def findAgent(self, name, timeout=None ): """ Given the id of a particular agent, return an instance of class Agent representing that agent. Return None if the agent does not exist. """ - if not isinstance(agent_id, AgentId): - raise TypeError("parameter must be an instance of class AgentId") self._lock.acquire() try: - if agent_id in self._agent_map: - return self._agent_map[agent_id] + agent = self._agent_map.get(name) + if agent: + return agent finally: self._lock.release() @@ -559,17 +607,21 @@ class Console(Thread): if handle == 0: raise Exception("Can not allocate a correlation id!") try: - tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)) + tmp_sender = self._session.sender(str(QmfAddress.direct(name, + self._domain)) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, QmfQuery._PREDICATE: - {QmfQuery._LOGIC_AND: - [{QmfQuery._CMP_EQ: ["vendor", agent_id.vendor()]}, - {QmfQuery._CMP_EQ: ["product", agent_id.product()]}, - {QmfQuery._CMP_EQ: ["name", agent_id.name()]}]}}) + {QmfQuery._CMP_EQ: ["_name", name]}}) msg = Message(subject=makeSubject(OpCode.agent_locate), properties={"method":"request"}, - content={MsgKey.query: query.mapEncode()}) - msg.reply_to = self._address + content={MsgKey.query: query.map_encode()}) + msg.reply_to = str(self._address) msg.correlation_id = str(handle) logging.debug("Sending Agent Locate (%s)" % time.time()) tmp_sender.send( msg ) @@ -588,13 +640,43 @@ class Console(Thread): logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: - if agent_id in self._agent_map: - new_agent = self._agent_map[agent_id] + new_agent = self._agent_map.get(name) finally: self._lock.release() return new_agent + def doQuery(self, agent, query, timeout=None ): + """ + """ + + handle = self._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + try: + logging.debug("Sending Query to Agent (%s)" % time.time()) + agent._sendQuery(query, handle) + except SendError, e: + logging.error(str(e)) + self._req_correlation.release(handle) + return None + + if not timeout: + timeout = self._reply_timeout + + logging.debug("Waiting for response to Query (%s)" % timeout) + reply = self._req_correlation.get_data(handle, timeout) + self._req_correlation.release(handle) + logging.debug("Agent Query wait ended (%s)" % time.time()) + if reply: + print("Agent Query Reply='%s'" % reply) + return reply.content + else: + print("Agent Query FAILED!!!") + return None + + + def run(self): global _callback_thread # @@ -607,7 +689,6 @@ class Console(Thread): try: msg = self._announce_recvr.fetch(timeout = 0) if msg: - logging.error( "Announce Msg Rcvd@%s: [%s]" % (time.time(), msg) ) self._dispatch(msg, _direct=False) except Empty: pass @@ -615,7 +696,6 @@ class Console(Thread): try: msg = self._direct_recvr.fetch(timeout = 0) if msg: - logging.error( "Direct Msg Rcvd@%s: [%s]" % (time.time(), msg) ) self._dispatch(msg, _direct=True) except Empty: pass @@ -654,6 +734,9 @@ class Console(Thread): """ PRIVATE: Process a message received from an Agent """ + + logging.error( "Message received from Agent! [%s]" % msg ) + try: version,opcode = parseSubject(msg.subject) # @todo: deal with version mismatch!!! @@ -670,7 +753,7 @@ class Console(Thread): if opcode == OpCode.agent_ind: self._handleAgentIndMsg( msg, cmap, version, _direct ) elif opcode == OpCode.data_ind: - logging.warning("!!! data_ind TBD !!!") + self._handleDataIndMsg(msg, cmap, version, _direct) elif opcode == OpCode.event_ind: logging.warning("!!! event_ind TBD !!!") elif opcode == OpCode.managed_object: @@ -696,7 +779,8 @@ class Console(Thread): if MsgKey.agent_info in cmap: try: - agent_id = AgentIdFactory(cmap[MsgKey.agent_info]) + # TODO: fix + name = cmap[MsgKey.agent_info]["_name"] except: logging.warning("Bad agent-ind message received: '%s'" % msg) return @@ -709,21 +793,22 @@ class Console(Thread): if direct and correlated: ignore = False elif self._agent_discovery_filter: - matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode())) - ignore = not matched + logging.error("FIXME: agent discovery filter - new agent name style") + # matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode())) + # ignore = not matched + matched = True; ignore = False # for now if not ignore: agent = None self._lock.acquire() try: - if agent_id in self._agent_map: - agent = self._agent_map[agent_id] + agent = self._agent_map.get(name) finally: self._lock.release() if not agent: # need to create and add a new agent - agent = self._createAgent(agent_id) + agent = self._createAgent(name) # lock out expiration scanning code self._lock.acquire() @@ -746,6 +831,22 @@ class Console(Thread): + + def _handleDataIndMsg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time())) + + if not self._req_correlation.isValid(msg.correlation_id): + logging.error("FIXME: uncorrelated data indicate??? msg='%s'" % str(msg)) + return + + # wake up all waiters + logging.error("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + def _expireAgents(self): """ Check for expired agents and issue notifications when they expire. @@ -777,21 +878,26 @@ class Console(Thread): - def _createAgent( self, agent_id ): + def _createAgent( self, name ): """ Factory to create/retrieve an agent for this console """ - if not isinstance(agent_id, AgentId): - raise TypeError("parameter must be an instance of class AgentId") self._lock.acquire() try: - if agent_id in self._agent_map: - return self._agent_map[agent_id] - - agent = Agent(agent_id, self) - agent._sender = self._session.sender(agent._address) - self._agent_map[agent_id] = agent + agent = self._agent_map.get(name) + if agent: + return agent + + agent = Agent(name, self) + agent._sender = self._session.sender(str(agent._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + + self._agent_map[name] = agent finally: self._lock.release() @@ -1235,33 +1341,13 @@ class Console(Thread): if __name__ == '__main__': # temp test code - from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory, - SchemaObjectClassFactory, ObjectIdFactory, QmfDescribed, - QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, - QmfEvent) - logging.getLogger().setLevel(logging.INFO) - - logging.info( "Starting Connection" ) - _c = Connection("localhost") - _c.connect() - #c.start() + from qmfCommon import (qmfTypes, QmfData, + QmfEvent, SchemaClassId, SchemaEventClass, + SchemaProperty, SchemaObjectClass) - logging.info( "Starting Console" ) - _myConsole = Console() - _myConsole.addConnection( _c ) - - logging.info( "Finding Agent" ) - _myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" ), 5 ) - - logging.info( "Agent Found: %s" % _myAgent ) - - logging.info( "Removing connection" ) - _myConsole.removeConnection( _c, 10 ) - - logging.info( "Destroying console:" ) - _myConsole.destroy( 10 ) + logging.getLogger().setLevel(logging.INFO) - logging.info( "************* Starting Async Console **************" ) + logging.info( "************* Creating Async Console **************" ) class MyNotifier(Notifier): def __init__(self, context): @@ -1275,239 +1361,179 @@ if __name__ == '__main__': _noteMe = MyNotifier( 666 ) _myConsole = Console(notifier=_noteMe) - _myConsole.addConnection( _c ) _myConsole.enableAgentDiscovery() logging.info("Waiting...") - while not _noteMe.WorkAvailable: - try: - print("No work yet...sleeping!") - time.sleep(1) - except KeyboardInterrupt: - break - - - print("Work available = %d items!" % _myConsole.getWorkItemCount()) - _wi = _myConsole.getNextWorkItem(timeout=0) - while _wi: - print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) - _wi = _myConsole.getNextWorkItem(timeout=0) - - - logging.info( "Removing connection" ) - _myConsole.removeConnection( _c, 10 ) - logging.info( "Destroying console:" ) _myConsole.destroy( 10 ) logging.info( "******** Messing around with Schema ********" ) - _sec = SchemaEventClassFactory( { "schema_id": # SchemaClassId map - {"package_name": "myPackage", - "class_name": "myClass", - "type": "event"}, - "desc": "A typical event schema", - "properties": {"Argument-1": - {"amqp_type": qmfTypes.TYPE_UINT8, - "min": 0, - "max": 100, - "unit": "seconds", - "desc": "sleep value"}, - "Argument-2": - {"amqp_type": qmfTypes.TYPE_LSTR, - "maxlen": 100, - "desc": "a string argument"}}} ) - print("_sec=%s" % _sec.getClassId()) - print("_sec.gePropertyCount()=%d" % _sec.getPropertyCount() ) - print("_sec.getProperty('Argument-1`)=%s" % _sec.getProperty('Argument-1') ) - print("_sec.getProperty('Argument-2`)=%s" % _sec.getProperty('Argument-2') ) + _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A typical event schema", + _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, + kwargs = {"min":0, + "max":100, + "unit":"seconds", + "desc":"sleep value"}), + "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, + kwargs={"maxlen":100, + "desc":"a string argument"})}) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) try: - print("_sec.getProperty('not-found')=%s" % _sec.getProperty('not-found') ) + print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) except: pass - print("_sec.getProperties()='%s'" % _sec.getProperties()) + print("_sec.getProperties()='%s'" % _sec.get_properties()) print("Adding another argument") - _arg3 = SchemaPropertyFactory( { "amqp_type": qmfTypes.TYPE_BOOL, - "dir": "IO", - "desc": "a boolean argument"} ) - _sec.addProperty('Argument-3', _arg3) - print("_sec=%s" % _sec.getClassId()) - print("_sec.getPropertyCount()=%d" % _sec.getPropertyCount() ) - print("_sec.getProperty('Argument-1')=%s" % _sec.getProperty('Argument-1') ) - print("_sec.getProperty('Argument-2')=%s" % _sec.getProperty('Argument-2') ) - print("_sec.getProperty('Argument-3')=%s" % _sec.getProperty('Argument-3') ) - - print("_arg3.mapEncode()='%s'" % _arg3.mapEncode() ) - - _secmap = _sec.mapEncode() + _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, + kwargs={"dir":"IO", + "desc":"a boolean argument"}) + _sec.add_property('Argument-3', _arg3) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) + print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) + + print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) + + _secmap = _sec.map_encode() print("_sec.mapEncode()='%s'" % _secmap ) - _sec2 = SchemaEventClassFactory( _secmap ) - - print("_sec=%s" % _sec.getClassId()) - print("_sec2=%s" % _sec2.getClassId()) - - - - - _soc = SchemaObjectClassFactory( {"schema_id": {"package_name": "myOtherPackage", - "class_name": "myOtherClass", - "type": "data"}, - "desc": "A test data object", - "properties": - {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, - "access": "RO", - "index": True, - "unit": "degrees"}, - "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, - "access": "RW", - "index": True, - "desc": "The Second Property(tm)", - "unit": "radians"}, - "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, - "unit": "seconds", - "desc": "time until I retire"}}, - "methods": - {"meth1": {"desc": "A test method", - "arguments": - {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, - "desc": "an argument 1", - "dir": "I"}, - "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, - "dir": "IO", - "desc": "some weird boolean"}}}, - "meth2": {"desc": "A test method", - "arguments": - {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, - "desc": "an 'nuther argument", - "dir": "I"}}}}, - "primary_key": ["prop2", "prop1"]}) + _sec2 = SchemaEventClass( _map=_secmap ) + + print("_sec=%s" % _sec.get_class_id()) + print("_sec2=%s" % _sec2.get_class_id()) + + _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", + "_class_name": "myOtherClass", + "_type": "_data"}, + "_desc": "A test data object", + "_values": + {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RO", + "index": True, + "unit": "degrees"}, + "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RW", + "index": True, + "desc": "The Second Property(tm)", + "unit": "radians"}, + "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, + "unit": "seconds", + "desc": "time until I retire"}, + "meth1": {"desc": "A test method", + "arguments": + {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an argument 1", + "dir": "I"}, + "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, + "dir": "IO", + "desc": "some weird boolean"}}}, + "meth2": {"desc": "A test method", + "arguments": + {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an 'nuther argument", + "dir": + "I"}}}}, + "_subtypes": + {"prop1":"qmfProperty", + "prop2":"qmfProperty", + "statistics":"qmfProperty", + "meth1":"qmfMethod", + "meth2":"qmfMethod"}, + "_primary_key_names": ["prop2", "prop1"]}) print("_soc='%s'" % _soc) - print("_soc.getPrimaryKeyList='%s'" % _soc.getPrimaryKeyList()) + print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) - print("_soc.getPropertyCount='%d'" % _soc.getPropertyCount()) - print("_soc.getProperties='%s'" % _soc.getProperties()) - print("_soc.getProperty('prop2')='%s'" % _soc.getProperty('prop2')) + print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) + print("_soc.getProperties='%s'" % _soc.get_properties()) + print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) - print("_soc.getMethodCount='%d'" % _soc.getMethodCount()) - print("_soc.getMethods='%s'" % _soc.getMethods()) - print("_soc.getMethod('meth2')='%s'" % _soc.getMethod('meth2')) + print("_soc.getMethodCount='%d'" % _soc.get_method_count()) + print("_soc.getMethods='%s'" % _soc.get_methods()) + print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) - _socmap = _soc.mapEncode() + _socmap = _soc.map_encode() print("_socmap='%s'" % _socmap) - _soc2 = SchemaObjectClassFactory( _socmap ) + _soc2 = SchemaObjectClass( _map=_socmap ) print("_soc='%s'" % _soc) print("_soc2='%s'" % _soc2) - if _soc2.getClassId() == _soc.getClassId(): + if _soc2.get_class_id() == _soc.get_class_id(): print("soc and soc2 are the same schema") logging.info( "******** Messing around with ObjectIds ********" ) - oid = ObjectIdFactory( {"agent_id": {"vendor": "redhat.com", - "product": "mgmt-tool", - "name": "myAgent1"}, - "primary_key": "key1:key2" }) - - print("oid = %s" % oid) - - oid2 = ObjectIdFactory( oid.mapEncode() ) - - print("oid2 = %s" % oid2) - - if oid == oid2: - print("oid1 == oid2") - else: - print("oid1 != oid2") - - hashme = {oid: "myoid"} - print("oid hash = %s" % hashme[oid2] ) - - qd = QmfData( {"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) + qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) print("qd='%s':" % qd) print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) - print("qd map='%s'" % qd.mapEncode()) - print("qd getProperty('prop4')='%s'" % qd.getProperty("prop4")) - qd.setProperty("prop4", 4) - print("qd setProperty('prop4', 4)='%s'" % qd.getProperty("prop4")) + print("qd map='%s'" % qd.map_encode()) + print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) + qd.set_value("prop4", 4, "A test property called 4") + print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) qd.prop4 = 9 print("qd.prop4 = 9 ='%s'" % qd.prop4) qd["prop4"] = 11 print("qd[prop4] = 11 ='%s'" % qd["prop4"]) - print("qd.mapEncode()='%s'" % qd.mapEncode()) - _qd2 = QmfDataFactory( qd.mapEncode() ) - print("_qd2.mapEncode()='%s'" % _qd2.mapEncode()) + print("qd.mapEncode()='%s'" % qd.map_encode()) + _qd2 = QmfData( _map = qd.map_encode() ) + print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) - _qmfDesc1 = QmfDescribed( _schemaId = _soc.getClassId(), - _props = {"prop1": 1, "statistics": 666, "prop2": 0}) + _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, + "prop2": 0}}, + agent="some agent name?", + _schema = _soc) - print("_qmfDesc1 map='%s'" % _qmfDesc1.mapEncode()) + print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) - _qmfDesc1.setSchema( _soc ) + _qmfDesc1._set_schema( _soc ) - print("_qmfDesc1 props{} = '%s'" % _qmfDesc1.getProperties()) - print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.getPrimaryKey()) - print("_qmfDesc1 classid = '%s'" % _qmfDesc1.getSchemaClassId()) + print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) + print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) + print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) - _qmfDescMap = _qmfDesc1.mapEncode() + _qmfDescMap = _qmfDesc1.map_encode() print("_qmfDescMap='%s'" % _qmfDescMap) - _qmfDesc2 = QmfDescribedFactory( _qmfDescMap, _schema=_soc ) - - print("_qmfDesc2 map='%s'" % _qmfDesc2.mapEncode()) - print("_qmfDesc2 props = '%s'" % _qmfDesc2.getProperties()) - print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.getPrimaryKey()) - + _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) - _qmfMgd1 = QmfManaged( _agentId=AgentId("redhat.com", "anAgent", "tross"), - _schema = _soc, - _schemaId = _soc.getClassId(), - _props = {"prop1": 11, "prop2": 10, "statistics":999}) - - - print("_qmfMgd1 map='%s'" % _qmfMgd1.mapEncode()) - - print("_qmfMgd1.getObjectId()='%s'" % _qmfMgd1.getObjectId()) - print("_qmfMgd1 props = '%s'" % _qmfMgd1.getProperties()) - - _qmfMgd1Map = _qmfMgd1.mapEncode() - print("_qmfMgd1Map='%s'" % _qmfMgd1Map) - - _qmfMgd2 = QmfManagedFactory( param=_qmfMgd1.mapEncode(), _schema=_soc ) - - print("_qmfMgd2 map='%s'" % _qmfMgd2.mapEncode()) - print("_qmfMgd2 getObjectId() = '%s'" % _qmfMgd2.getObjectId()) - print("_qmfMgd2 props = '%s'" % _qmfMgd2.getProperties()) + print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) + print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) + print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) logging.info( "******** Messing around with QmfEvents ********" ) _qmfevent1 = QmfEvent( _timestamp = 1111, - _agentId = AgentId("redhat.com", "whizzbang2000", "ted"), - _schema = _sec, - _props = {"Argument-1": 77, - "Argument-3": True, - "Argument-2": "a string"}) - print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.mapEncode()) - print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.getTimestamp()) - print("_qmfevent1.getAgentId()='%s'" % _qmfevent1.getAgentId()) - - _qmfevent1Map = _qmfevent1.mapEncode() - - _qmfevent2 = QmfEvent(_map=_qmfevent1Map) - print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.mapEncode()) + _schema = _sec, + _values = {"Argument-1": 77, + "Argument-3": True, + "Argument-2": "a string"}) + print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) + print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) + + _qmfevent1Map = _qmfevent1.map_encode() + + _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) + print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) logging.info( "******** Messing around with Queries ********" ) diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py index d413358dd8..9683127a6e 100644 --- a/qpid/python/qmf/test/agent_test.py +++ b/qpid/python/qmf/test/agent_test.py @@ -4,10 +4,8 @@ from threading import Semaphore from qpid.messaging import * -from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty, - SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed, - QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, - QmfEvent, SchemaMethod, Notifier) +from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, + QmfEvent, SchemaMethod, Notifier, SchemaClassId) from qmfAgent import (Agent, QmfAgentData) @@ -20,9 +18,9 @@ class ExampleNotifier(Notifier): self._sema4.release() def waitForWork(self): - logging.error("Waiting for event...") + print("Waiting for event...") self._sema4.acquire() - logging.error("...event present") + print("...event present") @@ -31,58 +29,54 @@ class ExampleNotifier(Notifier): # _notifier = ExampleNotifier() -_agent = Agent( "redhat.com", "qmf", "testAgent", _notifier ) +_agent = Agent( "qmf.testAgent", _notifier=_notifier ) # Dynamically construct a class schema -_schema = SchemaObjectClass( "MyPackage", "MyClass", - desc="A test data schema", - _pkey=["index1", "index2"] ) +_schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) # add properties -_schema.addProperty( "index1", - SchemaProperty(qmfTypes.TYPE_UINT8)) -_schema.addProperty( "index2", - SchemaProperty(qmfTypes.TYPE_LSTR)) +_schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) +_schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + # these two properties are statistics -_schema.addProperty( "query_count", - SchemaProperty(qmfTypes.TYPE_UINT32)) -_schema.addProperty( "method_call_count", - SchemaProperty(qmfTypes.TYPE_UINT32)) +_schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) +_schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + # These two properties can be set via the method call -_schema.addProperty( "set_string", - SchemaProperty(qmfTypes.TYPE_LSTR)) -_schema.addProperty( "set_int", - SchemaProperty(qmfTypes.TYPE_UINT32)) +_schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) +_schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) # add method _meth = SchemaMethod( _desc="Method to set string and int in object." ) -_meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) -_meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) -_schema.addMethod( "set_meth", _meth ) +_meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) +_meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) +_schema.add_method( "set_meth", _meth ) # Add schema to Agent -_agent.registerObjectClass(_schema) +_agent.register_object_class(_schema) # instantiate managed data objects matching the schema -_obj = QmfAgentData( _agent, _schema ) -_obj.setProperty("index1", 100) -_obj.setProperty("index2", "a name" ) -_obj.setProperty("set_string", "UNSET") -_obj.setProperty("set_int", 0) -_obj.setProperty("query_count", 0) -_obj.setProperty("method_call_count", 0) -_agent.addObject( _obj ) - -_agent.addObject( QmfAgentData( _agent, _schema, - _props={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) +_obj = QmfAgentData( _agent, _schema=_schema ) +_obj.set_value("index1", 100) +_obj.set_value("index2", "a name" ) +_obj.set_value("set_string", "UNSET") +_obj.set_value("set_int", 0) +_obj.set_value("query_count", 0) +_obj.set_value("method_call_count", 0) +_agent.add_object( _obj ) + +_agent.add_object( QmfAgentData( _agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) ## Now connect to the broker @@ -100,18 +94,18 @@ while not _done: while _wi: print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) _agent.releaseWorkItem(_wi) - _wi = _agent.getNextWorkitem(timeout=0) + _wi = _agent.getNextWorkItem(timeout=0) except: - logging.info( "shutting down..." ) + print( "shutting down..." ) _done = True -logging.info( "Removing connection... TBD!!!" ) +print( "Removing connection... TBD!!!" ) #_myConsole.remove_connection( _c, 10 ) -logging.info( "Destroying agent... TBD!!!" ) +print( "Destroying agent... TBD!!!" ) #_myConsole.destroy( 10 ) -logging.info( "******** agent test done ********" ) +print( "******** agent test done ********" ) diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py index e649b2b8e4..6db515dc99 100644 --- a/qpid/python/qmf/test/console_test.py +++ b/qpid/python/qmf/test/console_test.py @@ -4,7 +4,7 @@ from threading import Semaphore from qpid.messaging import * -from qmfCommon import (Notifier, QmfQuery) +from qmfCommon import (Notifier, QmfQuery, MsgKey, SchemaClassId, SchemaClass) from qmfConsole import Console @@ -16,18 +16,18 @@ class ExampleNotifier(Notifier): self._sema4.release() def waitForWork(self): - logging.error("Waiting for event...") + print("Waiting for event...") self._sema4.acquire() - logging.error("...event present") + print("...event present") logging.getLogger().setLevel(logging.INFO) -logging.info( "Starting Connection" ) +print( "Starting Connection" ) _c = Connection("localhost") _c.connect() -logging.info( "Starting Console" ) +print( "Starting Console" ) _notifier = ExampleNotifier() _myConsole = Console(notifier=_notifier) @@ -40,30 +40,65 @@ _myConsole.addConnection( _c ) _query = {QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, QmfQuery._PREDICATE: - {QmfQuery._LOGIC_AND: - [{QmfQuery._CMP_EQ: ["vendor", "redhat.com"]}, - {QmfQuery._CMP_EQ: ["product", "qmf"]}]}} + {QmfQuery._CMP_EQ: ["_name", "qmf.testAgent"]}} _query = QmfQuery(_query) _myConsole.enableAgentDiscovery(_query) _done = False while not _done: - try: - _notifier.waitForWork() - - _wi = _myConsole.get_next_workitem(timeout=0) - while _wi: - print("!!! work item received %d:%s" % (_wi.getType(), str(_wi.getParams()))) - _wi = _myConsole.get_next_workitem(timeout=0) - except: - logging.info( "shutting down..." ) - _done = True - -logging.info( "Removing connection" ) +# try: + _notifier.waitForWork() + + _wi = _myConsole.getNextWorkItem(timeout=0) + while _wi: + print("!!! work item received %d:%s" % (_wi.getType(), + str(_wi.getParams()))) + + + if _wi.getType() == _wi.AGENT_ADDED: + _agent = _wi.getParams().get("agent") + if not _agent: + print("!!!! AGENT IN REPLY IS NULL !!! ") + + _query = QmfQuery( {QmfQuery._TARGET: + {QmfQuery._TARGET_PACKAGES:None}} ) + + _reply = _myConsole.doQuery(_agent, _query) + + package_list = _reply.get(MsgKey.package_info) + for pname in package_list: + print("!!! Querying for schema from package: %s" % pname) + _query = QmfQuery({QmfQuery._TARGET: + {QmfQuery._TARGET_SCHEMA_ID:None}, + QmfQuery._PREDICATE: + {QmfQuery._CMP_EQ: + [SchemaClassId.KEY_PACKAGE, pname]}}) + + _reply = _myConsole.doQuery(_agent, _query) + + schema_id_list = _reply.get(MsgKey.schema_id) + for sid_map in schema_id_list: + _query = QmfQuery({QmfQuery._TARGET: + {QmfQuery._TARGET_SCHEMA:None}, + QmfQuery._PREDICATE: + {QmfQuery._CMP_EQ: + [SchemaClass.KEY_SCHEMA_ID, sid_map]}}) + + _reply = _myConsole.doQuery(_agent, _query) + + + + _myConsole.releaseWorkItem(_wi) + _wi = _myConsole.getNextWorkItem(timeout=0) +# except: +# logging.info( "shutting down..." ) +# _done = True + +print( "Removing connection" ) _myConsole.removeConnection( _c, 10 ) -logging.info( "Destroying console:" ) +print( "Destroying console:" ) _myConsole.destroy( 10 ) -logging.info( "******** console test done ********" ) +print( "******** console test done ********" ) -- cgit v1.2.1