From ac3d1986b5673b3de060ec781d206e57b1edd53a Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Thu, 10 Dec 2009 20:32:21 +0000 Subject: QPID-2261 - Patch from Ken Giusti git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@889416 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/Makefile | 1 + qpid/python/qmf/qmfAgent.py | 211 +++++ qpid/python/qmf/qmfCommon.py | 1792 ++++++++++++++++++++++++++++++++++++ qpid/python/qmf/qmfConsole.py | 1129 +++++++++++++++++++++++ qpid/python/qmf/test/agent_test.py | 58 ++ 5 files changed, 3191 insertions(+) create mode 100644 qpid/python/qmf/qmfAgent.py create mode 100644 qpid/python/qmf/qmfCommon.py create mode 100644 qpid/python/qmf/qmfConsole.py create mode 100644 qpid/python/qmf/test/agent_test.py diff --git a/qpid/python/Makefile b/qpid/python/Makefile index ff4a9af4f1..b5e48653d0 100644 --- a/qpid/python/Makefile +++ b/qpid/python/Makefile @@ -51,6 +51,7 @@ build: $(TARGETS) doc: @mkdir -p $(BUILD) epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log + epydoc qmf.qmfCommon qmf.qmfConsole -o $(BUILD)/doc/qmf --no-private --no-sourcecode --include-log install: build install -d $(PYTHON_LIB) diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py new file mode 100644 index 0000000000..59731a234b --- /dev/null +++ b/qpid/python/qmf/qmfAgent.py @@ -0,0 +1,211 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import socket +import os +import logging +from threading import Thread +from qpid.messaging import Connection, Message +from uuid import uuid4 +from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, + AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged) + + + + ##============================================================================== + ## AGENT + ##============================================================================== + +class Agent(Thread): + def __init__(self, vendor, product, name=None, + notifier=None, kwargs={}): + Thread.__init__(self) + self._running = False + self.vendor = vendor + self.product = product + if name: + self.name = name + else: + self.name = uuid4().get_urn().split(":")[2] + self._id = AgentId(self.vendor, self.product, self.name) + self._address = str(self._id) + self._notifier = notifier + self._conn = None + + def getAgentId(self): + return AgentId(self.vendor, self.product, self.name) + + def setConnection(self, conn): + self._conn = conn + self._session = self._conn.session() + self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE) + self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address) + self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION) + self._running = True + self._start() + + + def _dispatch(self, msg): + if msg.subject != "qmf4": + logging.debug("Ignoring non-qmf message '%s'" % msg.subject) + return + + cmap = {} + if msg.content_type == "amqp/map": + cmap = msg.content + + if (not msg.properties or + not "method" in msg.properties or + not "opcode" in msg.properties): + logging.error("INVALID MESSAGE PROPERTIES: '%s'" % str(msg.properties)) + return + + if msg.properties["method"] == "request": + if msg.properties["opcode"] == "agent-locate": + if "query" in cmap: + query = cmap["query"] + if ("vendor" in query and (query["vendor"] == "*" or query["vendor"] == self.vendor) and + "product" in query and (query["product"] == "*" or query["product"] == self.product) and + "name" in query and (query["name"] == "*" or query["name"] == self.name)): + logging.debug("Query received for %s:%s:%s" % (self.vendor, self.product, self.name)) + logging.debug("reply-to [%s], cid=%s" % (msg.reply_to, msg.correlation_id)) + try: + tmp_snd = self.session.sender( msg.reply_to ) + m = Message( subject="qmf4", + properties={"method":"response", + "opcode":"agent"}, + content={"name": {"vendor":"redhat.com", + "product":"agent", + "name":"tross"}}, + correlation_id=msg.correlation_id) + tmp_snd.send(m) + logging.debug("reply-to [%s] sent" % msg.reply_to) + except e: + logging.error("Failed to send reply to msg '%s'" % str(e)) + + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % msg.properties["opcode"]) + else: + logging.warning("Ignoring message with unrecognized 'method' value: '%s'" + % msg.properties["method"] ) + + + + def run(self): + count = 0 # @todo: hack + while self._running: + try: + msg = self._locate_receiver.fetch(1) + if msg.content_type == "amqp/map": + self._dispatch(msg) + except KeyboardInterrupt: + break + except: + pass + + try: + msg = self._direct_receiver.fetch(1) + if msg.content_type == "amqp/map": + self._dispatch(msg) + except KeyboardInterrupt: + break + except: + pass + + count+= 1 + if count == 5: + count = 0 + m = Message( subject="qmf4", + properties={"method":"indication", + "opcode":"agent"}, + content={"name": {"vendor":"redhat.com", + "product":"agent", + "name":"tross"}} ) + self.ind_sender.send(m) + logging.info("Agent Indication Sent") + + + def registerObjectClass(self, cls): + logging.error("!!!Agent.registerObjectClass() TBD!!!") + + def registerEventClass(self, cls): + logging.error("!!!Agent.registerEventClass() TBD!!!") + + def raiseEvent(self, qmfEvent): + logging.error("!!!Agent.raiseEvent() TBD!!!") + + def addObject(self, qmfAgentData ): + logging.error("!!!Agent.addObject() TBD!!!") + + def methodResponse(self, context, status, text, arguments): + logging.error("!!!Agent.methodResponse() TBD!!!") + + def getWorkItemCount(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + logging.error("!!!Agent.getWorkItemCount() TBD!!!") + + def getNextWorkItem(self, timeout=None): + """ + Obtains the next pending work item, or None if none available. + """ + logging.error("!!!Agent.getNextWorkItem() TBD!!!") + + def releaseWorkItem(self, wi): + """ + Releases a WorkItem instance obtained by getNextWorkItem(). Called when + the application has finished processing the WorkItem. + """ + logging.error("!!!Agent.releaseWorkItem() TBD!!!") + + + + + ##============================================================================== + ## OBJECTS + ##============================================================================== + + +class QmfAgentData(QmfManaged): + """ + A managed data object that is owned by an agent. + """ + def __init__(self, _agent, _schema, _props={}): + """ + @type _agent: class Agent + @param _agent: the agent that manages this object. + @type _schema: class SchemaObjectClass + @param _schema: the schema used to describe this data object + @type _props: map of "name"= pairs + @param _props: initial values for all properties in this object + """ + super(QmfAgentData, self).__init__(_agentId=_agent.getAgentId(), + _schema=_schema, + _props=_props) + + def destroy(self): + self._timestamps[QmfManaged._ts_delete] = long(time.time() * 1000) + # @todo: publish change + + def setProperty( self, _name, _value): + super(QmfAgentData, self).setProperty(_name, _value) + # @todo: publish change diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py new file mode 100644 index 0000000000..66162a18bb --- /dev/null +++ b/qpid/python/qmf/qmfCommon.py @@ -0,0 +1,1792 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import socket +import os +import time +import logging +from threading import Lock +from threading import Condition +try: + import hashlib + _md5Obj = hashlib.md5 +except ImportError: + import md5 + _md5Obj = md5.new + + + + +## +## Constants +## + +AMQP_QMF_TOPIC = "amq.topic" +AMQP_QMF_DIRECT = "amq.direct" +AMQP_QMF_NAME_SEPARATOR = "/" +AMQP_QMF_AGENT_LOCATE = "amq.topic/agent.locate" +AMQP_QMF_AGENT_INDICATION = "amq.topic/agent.ind" + + +##============================================================================== +## Agent Identification +##============================================================================== + +class AgentId(object): + """ + Uniquely identifies a management agent within the entire management domain. + + Map format: + map["vendor"] = str, name of vendor of the agent + map["product"] = str, name of product using agent + map["name"] = str, name of agent, unique within vendor and product. + """ + _separator = ":" + def __init__(self, vendor, product, name): + """ + Note: this object must be immutable, as it is used to index into a dictionary + """ + self._vendor = vendor + self._product = product + self._name = name + + def vendor(self): + return self._vendor + + def product(self): + return self._product + + def name(self): + return self._name + + def mapEncode(self): + _map = {} + _map["vendor"] = self._vendor + _map["product"] = self._product + _map["name"] = self._name + return _map + + def __cmp__(self, other): + if not isinstance(other, AgentId) : + raise TypeError("Invalid types for compare") + # return 1 + me = str(self) + them = str(other) + + if me < them: + return -1 + if me > them: + return 1 + return 0 + + def __hash__(self): + return (self._vendor, self._product, self._name).__hash__() + + def __repr__(self): + return self._vendor + AgentId._separator + \ + self._product + AgentId._separator + \ + self._name + + def __str__(self): + return self.__repr__() + + + +def AgentIdFactory( param ): + """ + Factory for constructing an AgentId class from various sources. + + @type param: various + @param param: object to use for constructing AgentId + @rtype: AgentId + @returns: a new AgentId instance + """ + if type(param) == str: + if param.count(AgentId._separator) < 2: + raise TypeError("AgentId string format must be 'vendor:product:name'") + vendor, product, name = param.split(AgentId._separator) + return AgentId(vendor, product, name) + if type(param) == dict: + # construct from map encoding + if not "vendor" in param: + raise TypeError("requires 'vendor' value") + if not "product" in param: + raise TypeError("requires 'product' value") + if not "name" in param: + raise TypeError("requires 'name' value") + return AgentId( param["vendor"], param["product"], param["name"] ) + raise TypeError("Invalid type for AgentId construction") + + + +##============================================================================== +## DATA MODEL +##============================================================================== + + + +class ObjectId(object): + """ + An instance of managed object must be uniquely identified within the + management system. Each managed object is given a key that is unique + within the domain of the object's managing Agent. Note that these + keys are not unique across Agents. Therefore, a globally unique name + for an instance of a managed object is the concatenation of the + object's key and the managing Agent's AgentId. + + Map format: + map["agent_id"] = map representation of AgentId + map["primary_key"] = str, key for managed object + """ + def __init__(self, agentid, name): + if not isinstance(agentid, AgentId): + raise TypeError("requires an AgentId class") + self._agentId = agentid; + self._name = name; + + def getAgentId(self): + """ + @rtype: class AgentId + @returns: Id of agent that manages the object. + """ + return self._agentId + + def getPrimaryKey(self): + """ + @rtype: str + @returns: Key of managed object. + """ + return self._name + + def mapEncode(self): + _map = {} + _map["agent_id"] = self._agentId.mapEncode() + _map["primary_key"] = self._name + return _map + + def __repr__(self): + return "%s:%s" % (self._agentId, self._name) + + def __cmp__(self, other): + if not isinstance(other, ObjectId) : + raise TypeError("Invalid types for compare") + + if self._agentId < other._agentId: + return -1 + if self._agentId > other._agentId: + return 1 + if self._name < other._name: + return -1 + if self._name > other._name: + return 1 + return 0 + + def __hash__(self): + return (hash(self._agentId), self._name).__hash__() + + +def ObjectIdFactory( param ): + """ + Factory for constructing ObjectIds from various sources + + @type param: various + @param param: object to use for constructing ObjectId + @rtype: ObjectId + @returns: a new ObjectId instance + """ + if type(param) == dict: + # construct from map + if "agent_id" not in param: + raise TypeError("requires 'agent_id' value") + if "primary_key" not in param: + raise TypeError("requires 'primary_key' value") + + return ObjectId( AgentIdFactory(param["agent_id"]), param["primary_key"] ) + + else: + raise TypeError("Invalid type for ObjectId construction") + + + + +class QmfData(object): + """ + Base data class representing arbitrarily structure data. No schema or + managing agent is associated with data of this class. + + Map format: + map["properties"] = map of unordered "name"= pairs (optional) + """ + def __init__(self, _props={}, _const=False): + """ + @type _props: dict + @param _props: dictionary of initial name=value pairs for object's property data. + @type _const: boolean + @param _const: if true, this object cannot be modified + """ + self._properties = _props.copy() + self._const = _const + self._managed = False # not managed by an Agent + self._described = False # not described by a schema + + def isManaged(self): + return self._managed + + def isDescribed(self): + return self._described + + def getProperties(self): + return self._properties.copy() + + def getProperty(self, _name): + return self._properties[_name] + + def setProperty(self, _name, _value): + if self._const: + raise Exception("cannot modify constant data object") + self._properties[_name] = _value + return _value + + def mapEncode(self): + return self._properties.copy() + + def __repr__(self): + return str(self.mapEncode()) + + def __setattr__(self, _name, _value): + # ignore private data members + if _name[0] == '_': + return super.__setattr__(self, _name, _value) + # @todo: this is bad - what if the same name is used for a + # property and statistic or argument? + if _name in self._properties: + return self.setProperty(_name, _value) + return super.__setattr__(self, _name, _value) + + def __getattr__(self, _name): + # @todo: this is bad - what if the same name is used for a + # property and statistic or argument? + if _name in self._properties: return self.getProperty(_name) + raise AttributeError("no item named '%s' in this object" % _name) + + def __getitem__(self, _name): + return self.__getattr__(_name) + + def __setitem__(self, _name, _value): + return self.__setattr__(_name, _value) + + +def QmfDataFactory( param, const=False ): + """ + Factory for constructing an QmfData class from various sources. + + @type param: various + @param param: object to use for constructing QmfData instance + @rtype: QmfData + @returns: a new QmfData instance + """ + if type(param) == dict: + # construct from map + return QmfData( _props=param, _const=const ) + + else: + raise TypeError("Invalid type for QmfData construction") + + + +class QmfDescribed(QmfData): + """ + Data that has a formally defined structure is represented by the + QmfDescribed class. This class extends the QmfData class by + associating the data with a formal schema (SchemaObjectClass). + + Map format: + map["schema_id"] = map representation of a SchemaClassId instance + map["properties"] = map representation of a QmfData instance + """ + def __init__(self, _schema=None, _schemaId=None, _props={}, _const=False ): + """ + @type _schema: class SchemaClass or derivative + @param _schema: an instance of the schema used to describe this object. + @type _schemaId: class SchemaClassId + @param _schemaId: if schema instance not available, this is mandatory. + @type _props: dict + @param _props: dictionary of initial name=value pairs for object's property data. + @type _const: boolean + @param _const: if true, this object cannot be modified + """ + super(QmfDescribed, self).__init__(_props, _const) + self._validated = False + self._described = True + self._schema = _schema + if _schema: + self._schemaId = _schema.getClassId() + if self._const: + self._validate() + else: + if _schemaId: + self._schemaId = _schemaId + else: + raise Exception("A SchemaClass or SchemaClassId must be provided") + + + def getSchemaClassId(self): + """ + @rtype: class SchemaClassId + @returns: the identifier of the Schema that describes the structure of the data. + """ + return self._schemaId + + def setSchema(self, _schema): + """ + @type _schema: class SchemaClass or derivative + @param _schema: instance of schema used to describe the structure of the data. + """ + if self._schemaId != _schema.getClassId(): + raise Exception("Cannot reset current schema to a different schema") + oldSchema = self._schema + self._schema = _schema + if not oldSchema and self._const: + self._validate() + + def getPrimaryKey(self): + """ + Get a string composed of the object's primary key properties. + @rtype: str + @returns: a string composed from primary key property values. + """ + if not self._schema: + raise Exception("schema not available") + + if not self._validated: + self._validate() + + if self._schema._pkeyNames == 0: + if len(self._properties) != 1: + raise Exception("no primary key defined") + return str(self._properties.values()[0]) + + result = u"" + for pkey in self._schema._pkeyNames: + if result != u"": + result += u":" + try: + valstr = unicode(self._properties[pkey]) + except: + valstr = u"" + result += valstr + return result + + + def mapEncode(self): + _map = {} + _map["schema_id"] = self._schemaId.mapEncode() + _map["properties"] = super(QmfDescribed, self).mapEncode() + return _map + + def _validate(self): + """ + Compares this object's data against the associated schema. Throws an + exception if the data does not conform to the schema. + """ + for name,val in self._schema._properties.iteritems(): + # @todo validate: type compatible with amqp_type? + # @todo validate: primary keys have values + if name not in self._properties: + if val._isOptional: + # ok not to be present, put in dummy value + # to simplify access + self._properties[name] = None + else: + raise Exception("Required property '%s' not present." % name) + self._validated = True + + + +def QmfDescribedFactory( param, _schema=None ): + """ + Factory for constructing an QmfDescribed class from various sources. + + @type param: various + @param param: object to use for constructing QmfDescribed instance + @type _schema: SchemaClass + @param _schema: instance of the SchemaClass that describes this instance + @rtype: QmfDescribed + @returns: a new QmfDescribed instance + """ + if type(param) == dict: + # construct from map + if "schema_id" not in param: + raise TypeError("requires 'schema_id' value") + if "properties" not in param: + raise TypeError("requires 'properties' value") + + return QmfDescribed( _schema=_schema, _schemaId=SchemaClassIdFactory( param["schema_id"] ), + _props = param["properties"] ) + else: + raise TypeError("Invalid type for QmfDescribed construction") + + + +class QmfManaged(QmfDescribed): + """ + Data that has a formally defined structure, and for which each + instance of the data is managed by a particular Agent is represented + by the QmfManaged class. This class extends the QmfDescribed class by + associating the described data with a particular Agent. + + Map format: + map["object_id"] = map representation of an ObjectId value + map["schema_id"] = map representation of a SchemaClassId instance + map["qmf_data"] = map representation of a QmfData instance + map["timestamps"] = array of AMQP timestamps. [0] = time of last update, + [1] = creation timestamp, [2] = deletion timestamp or zero. + """ + _ts_update = 0 + _ts_create = 1 + _ts_delete = 2 + def __init__( self, _agentId=None, _schema=None, _schemaId=None, + _props={}, _const=False ): + """ + @type _agentId: class AgentId + @param _agentId: globally unique identifier of the managing agent. + @type _schema: class SchemaClass or derivative + @param _schema: an instance of the schema used to describe this object. + @type _schemaId: class SchemaClassId + @param _schemaId: if schema instance not available, this is mandatory. + @type _props: dict + @param _props: dictionary of initial name=value pairs for object's property data. + @type _const: boolean + @param _const: if true, this object cannot be modified + """ + super(QmfManaged, self).__init__(_schema=_schema, _schemaId=_schemaId, + _props=_props, _const=_const) + self._managed = True + self._agentId = _agentId + # timestamp, in millisec since epoch UTC + _ctime = long(time.time() * 1000) + self._timestamps = [_ctime,_ctime,0] + + + def getObjectId(self): + """ + @rtype: class ObjectId + @returns: the ObjectId that uniquely identifies this managed object. + """ + return ObjectId(self._agentId, self.getPrimaryKey()) + + def isDeleted(self): + return self._timestamps[QmfManaged._ts_delete] == 0 + + def mapEncode(self): + _map = super(QmfManaged, self).mapEncode() + _map["agent_id"] = self._agentId.mapEncode() + _map["timestamps"] = self._timestamps[:] + return _map + + + +def QmfManagedFactory( param, _schema=None ): + """ + Factory for constructing an QmfManaged instance from various sources. + + @type param: various + @param param: object to use for constructing QmfManaged instance + @type _schema: SchemaClass + @param _schema: instance of the SchemaClass that describes this instance + @rtype: QmfManaged + @returns: a new QmfManaged instance + """ + if type(param) == dict: + # construct from map + if "agent_id" not in param: + raise TypeError("requires 'agent_id' value") + _qd = QmfDescribedFactory( param, _schema ) + + return QmfManaged( _agentId = AgentIdFactory(param["agent_id"]), + _schema=_schema, _schemaId=_qd._schemaId, + _props = _qd._properties ) + else: + raise TypeError("Invalid type for QmfManaged construction") + + + +class QmfEvent(QmfDescribed): + """ + A QMF Event is a type of described data that is not managed. Events are + notifications that are sent by Agents. An event notifies a Console of a + change in some aspect of the system under managment. + """ + def __init__(self, _map=None, + _timestamp=None, _agentId=None, + _schema=None, _schemaId=None, + _props={}, _const=False): + """ + @type _map: dict + @param _map: if not None, construct instance from map representation. + @type _timestamp: int + @param _timestamp: moment in time when event occurred, expressed + as milliseconds since Midnight, Jan 1, 1970 UTC. + @type _agentId: class AgentId + @param _agentId: Identifies agent issuing this event. + @type _schema: class Schema + @param _schema: + @type _schemaId: class SchemaClassId (event) + @param _schemaId: identi + """ + if _map: + if type(_map) != dict: + raise TypeError("parameter '_map' must be of type 'dict'") + if "timestamp" not in _map: + pass + if "agent_id" not in _map: + pass + _qe = QmfDescribedFactory( _map, _schema ) + super(QmfEvent, self).__init__( _schema=_qe._schema, _schemaId=_qe._schemaId, + _props=_qe._properties, _const=_qe._const ) + self._timestamp = long(_map["timestamp"]) + self._agentId = AgentIdFactory(_map["agent_id"]) + else: + super(QmfEvent, self).__init__(_schema=_schema, _schemaId=_schemaId, + _props=_props, _const=_const) + self._timestamp = long(_timestamp) + self._agentId = _agentId + + def getTimestamp(self): return self._timestamp + + def getAgentId(self): return self._agentId + + def mapEncode(self): + _map = super(QmfEvent, self).mapEncode() + _map["timestamp"] = self._timestamp + _map["agent_id"] = self._agentId.mapEncode() + return _map + + + +#============================================================================== +#============================================================================== +#============================================================================== + + + +class QmfObject(object): + # attr_reader :impl, :object_class + def __init__(self, cls, kwargs={}): + pass +# self._cv = Condition() +# self._sync_count = 0 +# self._sync_result = None +# self._allow_sets = False +# if kwargs.has_key("broker"): +# self._broker = kwargs["broker"] +# else: +# self._broker = None +# if cls: +# self.object_class = cls +# self.impl = qmfengine.Object(self.object_class.impl) +# elif kwargs.has_key("impl"): +# self.impl = qmfengine.Object(kwargs["impl"]) +# self.object_class = SchemaObjectClass(None, +# None, +# {"impl":self.impl.getClass()}) +# else: +# raise Exception("Argument error: required parameter ('impl') not supplied") + + +# def destroy(self): +# self.impl.destroy() + + +# def object_id(self): +# return ObjectId(self.impl.getObjectId()) + + +# def set_object_id(self, oid): +# self.impl.setObjectId(oid.impl) + + +# def properties(self): +# list = [] +# for prop in self.object_class.properties: +# list.append([prop, self.get_attr(prop.name())]) +# return list + + +# def statistics(self): +# list = [] +# for stat in self.object_class.statistics: +# list.append([stat, self.get_attr(stat.name())]) +# return list + + +# def get_attr(self, name): +# val = self._value(name) +# vType = val.getType() +# if vType == TYPE_UINT8: return val.asUint() +# elif vType == TYPE_UINT16: return val.asUint() +# elif vType == TYPE_UINT32: return val.asUint() +# elif vType == TYPE_UINT64: return val.asUint64() +# elif vType == TYPE_SSTR: return val.asString() +# elif vType == TYPE_LSTR: return val.asString() +# elif vType == TYPE_ABSTIME: return val.asInt64() +# elif vType == TYPE_DELTATIME: return val.asUint64() +# elif vType == TYPE_REF: return ObjectId(val.asObjectId()) +# elif vType == TYPE_BOOL: return val.asBool() +# elif vType == TYPE_FLOAT: return val.asFloat() +# elif vType == TYPE_DOUBLE: return val.asDouble() +# elif vType == TYPE_UUID: return val.asUuid() +# elif vType == TYPE_INT8: return val.asInt() +# elif vType == TYPE_INT16: return val.asInt() +# elif vType == TYPE_INT32: return val.asInt() +# elif vType == TYPE_INT64: return val.asInt64() +# else: +# # when TYPE_MAP +# # when TYPE_OBJECT +# # when TYPE_LIST +# # when TYPE_ARRAY +# logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) ) +# return None + + +# def set_attr(self, name, v): +# val = self._value(name) +# vType = val.getType() +# if vType == TYPE_UINT8: return val.setUint(v) +# elif vType == TYPE_UINT16: return val.setUint(v) +# elif vType == TYPE_UINT32: return val.setUint(v) +# elif vType == TYPE_UINT64: return val.setUint64(v) +# elif vType == TYPE_SSTR: +# if v: return val.setString(v) +# else: return val.setString('') +# elif vType == TYPE_LSTR: +# if v: return val.setString(v) +# else: return val.setString('') +# elif vType == TYPE_ABSTIME: return val.setInt64(v) +# elif vType == TYPE_DELTATIME: return val.setUint64(v) +# elif vType == TYPE_REF: return val.setObjectId(v.impl) +# elif vType == TYPE_BOOL: return val.setBool(v) +# elif vType == TYPE_FLOAT: return val.setFloat(v) +# elif vType == TYPE_DOUBLE: return val.setDouble(v) +# elif vType == TYPE_UUID: return val.setUuid(v) +# elif vType == TYPE_INT8: return val.setInt(v) +# elif vType == TYPE_INT16: return val.setInt(v) +# elif vType == TYPE_INT32: return val.setInt(v) +# elif vType == TYPE_INT64: return val.setInt64(v) +# else: +# # when TYPE_MAP +# # when TYPE_OBJECT +# # when TYPE_LIST +# # when TYPE_ARRAY +# logging.error("Unsupported type for get_attr? '%s'" % str(val.getType())) +# return None + + +# def __getitem__(self, name): +# return self.get_attr(name) + + +# def __setitem__(self, name, value): +# self.set_attr(name, value) + + +# def inc_attr(self, name, by=1): +# self.set_attr(name, self.get_attr(name) + by) + + +# def dec_attr(self, name, by=1): +# self.set_attr(name, self.get_attr(name) - by) + + + + +# def _invokeMethod(self, name, argMap): +# """ +# Private: Helper function that invokes an object's method, and waits for the result. +# """ +# self._cv.acquire() +# try: +# timeout = 30 +# self._sync_count = 1 +# self.impl.invokeMethod(name, argMap, self) +# if self._broker: +# self._broker.conn.kick() +# self._cv.wait(timeout) +# if self._sync_count == 1: +# raise Exception("Timed out: waiting for response to method call.") +# finally: +# self._cv.release() + +# return self._sync_result + + +# def _method_result(self, result): +# """ +# Called to return the result of a method call on an object +# """ +# self._cv.acquire(); +# try: +# self._sync_result = result +# self._sync_count -= 1 +# self._cv.notify() +# finally: +# self._cv.release() + + +# def _marshall(schema, args): +# ''' +# Private: Convert a list of arguments (positional) into a Value object of type "map". +# Used to create the argument parameter for an object's method invokation. +# ''' +# # Build a map of the method's arguments +# map = qmfengine.Value(TYPE_MAP) +# for arg in schema.arguments: +# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: +# map.insert(arg.name, qmfengine.Value(arg.typecode)) + +# # install each argument's value into the map +# marshalled = Arguments(map) +# idx = 0 +# for arg in schema.arguments: +# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: +# if args[idx]: +# marshalled[arg.name] = args[idx] +# idx += 1 + +# return marshalled.map + + +# def _value(self, name): +# val = self.impl.getValue(name) +# if not val: +# raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % +# (name, +# self.object_class.impl.getClassKey().getPackageName(), +# self.object_class.impl.getClassKey().getClassName())) +# return val + + + + + +class Arguments(object): + def __init__(self, map): + pass +# self.map = map +# self._by_hash = {} +# key_count = self.map.keyCount() +# a = 0 +# while a < key_count: +# self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a)) +# a += 1 + + +# def __getitem__(self, key): +# return self._by_hash[key] + + +# def __setitem__(self, key, value): +# self._by_hash[key] = value +# self.set(key, value) + + +# def __iter__(self): +# return self._by_hash.__iter__ + + +# def __getattr__(self, name): +# if name in self._by_hash: +# return self._by_hash[name] +# return super.__getattr__(self, name) + + +# def __setattr__(self, name, value): +# # +# # ignore local data members +# # +# if (name[0] == '_' or +# name == 'map'): +# return super.__setattr__(self, name, value) + +# if name in self._by_hash: +# self._by_hash[name] = value +# return self.set(name, value) + +# return super.__setattr__(self, name, value) + + +# def by_key(self, key): +# val = self.map.byKey(key) +# vType = val.getType() +# if vType == TYPE_UINT8: return val.asUint() +# elif vType == TYPE_UINT16: return val.asUint() +# elif vType == TYPE_UINT32: return val.asUint() +# elif vType == TYPE_UINT64: return val.asUint64() +# elif vType == TYPE_SSTR: return val.asString() +# elif vType == TYPE_LSTR: return val.asString() +# elif vType == TYPE_ABSTIME: return val.asInt64() +# elif vType == TYPE_DELTATIME: return val.asUint64() +# elif vType == TYPE_REF: return ObjectId(val.asObjectId()) +# elif vType == TYPE_BOOL: return val.asBool() +# elif vType == TYPE_FLOAT: return val.asFloat() +# elif vType == TYPE_DOUBLE: return val.asDouble() +# elif vType == TYPE_UUID: return val.asUuid() +# elif vType == TYPE_INT8: return val.asInt() +# elif vType == TYPE_INT16: return val.asInt() +# elif vType == TYPE_INT32: return val.asInt() +# elif vType == TYPE_INT64: return val.asInt64() +# else: +# # when TYPE_MAP +# # when TYPE_OBJECT +# # when TYPE_LIST +# # when TYPE_ARRAY +# logging.error( "Unsupported Type for Get? '%s'" % str(val.getType())) +# return None + + +# def set(self, key, value): +# val = self.map.byKey(key) +# vType = val.getType() +# if vType == TYPE_UINT8: return val.setUint(value) +# elif vType == TYPE_UINT16: return val.setUint(value) +# elif vType == TYPE_UINT32: return val.setUint(value) +# elif vType == TYPE_UINT64: return val.setUint64(value) +# elif vType == TYPE_SSTR: +# if value: +# return val.setString(value) +# else: +# return val.setString('') +# elif vType == TYPE_LSTR: +# if value: +# return val.setString(value) +# else: +# return val.setString('') +# elif vType == TYPE_ABSTIME: return val.setInt64(value) +# elif vType == TYPE_DELTATIME: return val.setUint64(value) +# elif vType == TYPE_REF: return val.setObjectId(value.impl) +# elif vType == TYPE_BOOL: return val.setBool(value) +# elif vType == TYPE_FLOAT: return val.setFloat(value) +# elif vType == TYPE_DOUBLE: return val.setDouble(value) +# elif vType == TYPE_UUID: return val.setUuid(value) +# elif vType == TYPE_INT8: return val.setInt(value) +# elif vType == TYPE_INT16: return val.setInt(value) +# elif vType == TYPE_INT32: return val.setInt(value) +# elif vType == TYPE_INT64: return val.setInt64(value) +# else: +# # when TYPE_MAP +# # when TYPE_OBJECT +# # when TYPE_LIST +# # when TYPE_ARRAY +# logging.error("Unsupported Type for Set? '%s'" % str(val.getType())) +# return None + + + +class MethodResponse(object): + def __init__(self, impl): + pass +# self.impl = qmfengine.MethodResponse(impl) + + +# def status(self): +# return self.impl.getStatus() + + +# def exception(self): +# return self.impl.getException() + + +# def text(self): +# return exception().asString() + + +# def args(self): +# return Arguments(self.impl.getArgs()) + + +# def __getattr__(self, name): +# myArgs = self.args() +# return myArgs.__getattr__(name) + + +# def __setattr__(self, name, value): +# if name == 'impl': +# return super.__setattr__(self, name, value) + +# myArgs = self.args() +# return myArgs.__setattr__(name, value) + + + +# ##============================================================================== +# ## QUERY +# ##============================================================================== + + +class Query: + def __init__(self, kwargs={}): + pass +# if "impl" in kwargs: +# self.impl = kwargs["impl"] +# else: +# package = '' +# if "key" in kwargs: +# # construct using SchemaClassKey: +# self.impl = qmfengine.Query(kwargs["key"]) +# elif "object_id" in kwargs: +# self.impl = qmfengine.Query(kwargs["object_id"].impl) +# else: +# if "package" in kwargs: +# package = kwargs["package"] +# if "class" in kwargs: +# self.impl = qmfengine.Query(kwargs["class"], package) +# else: +# raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']") + + +# def package_name(self): return self.impl.getPackage() +# def class_name(self): return self.impl.getClass() +# def object_id(self): +# _objid = self.impl.getObjectId() +# if _objid: +# return ObjectId(_objid) +# else: +# return None + + +##============================================================================== +## SCHEMA +##============================================================================== + +# known schema types + +SchemaTypeData = "data" +SchemaTypeEvent = "event" + +# format convention for schema hash + +_schemaHashStrFormat = "%08x-%08x-%08x-%08x" +_schemaHashStrDefault = "00000000-00000000-00000000-00000000" + +# Argument typecodes, access, and direction qualifiers + +class qmfTypes(object): + TYPE_UINT8 = 1 + TYPE_UINT16 = 2 + TYPE_UINT32 = 3 + TYPE_UINT64 = 4 + TYPE_SSTR = 6 + TYPE_LSTR = 7 + TYPE_ABSTIME = 8 + TYPE_DELTATIME = 9 + TYPE_REF = 10 + TYPE_BOOL = 11 + TYPE_FLOAT = 12 + TYPE_DOUBLE = 13 + TYPE_UUID = 14 + TYPE_MAP = 15 + TYPE_INT8 = 16 + TYPE_INT16 = 17 + TYPE_INT32 = 18 + TYPE_INT64 = 19 + TYPE_OBJECT = 20 + TYPE_LIST = 21 + TYPE_ARRAY = 22 + + +class qmfAccess(object): + READ_CREATE = 1 + READ_WRITE = 2 + READ_ONLY = 3 + + +class qmfDirection(object): + DIR_IN = 1 + DIR_OUT = 2 + DIR_IN_OUT = 3 + + + +def _toBool( param ): + """ + Helper routine to convert human-readable representations of + boolean values to python bool types. + """ + _false_strings = ["off", "no", "false", "0", "none"] + _true_strings = ["on", "yes", "true", "1"] + if type(param) == str: + lparam = param.lower() + if lparam in _false_strings: + return False + if lparam in _true_strings: + return True + raise TypeError("unrecognized boolean string: '%s'" % param ) + else: + return bool(param) + + + +class SchemaClassId(object): + """ + Unique identifier for an instance of a SchemaClass. + + Map format: + map["package_name"] = str, name of associated package + map["class_name"] = str, name of associated class + map["type"] = str, "data"|"event", default: "data" + optional: + map["hash_str"] = str, hash value in standard format or None + if hash is unknown. + """ + def __init__(self, pname, cname, stype=SchemaTypeData, hstr=None): + """ + @type pname: str + @param pname: the name of the class's package + @type cname: str + @param cname: name of the class + @type stype: str + @param stype: schema type [SchemaTypeData | SchemaTypeEvent] + @type hstr: str + @param hstr: the hash value in '%08x-%08x-%08x-%08x' format + """ + self._pname = pname + self._cname = cname + if stype != SchemaTypeData and stype != SchemaTypeEvent: + raise TypeError("Invalid SchemaClassId type: '%s'" % stype) + self._type = stype + self._hstr = hstr + if self._hstr: + try: + # sanity check the format of the hash string + hexValues = hstr.split("-") + h0 = int(hexValues[0], 16) + h1 = int(hexValues[1], 16) + h2 = int(hexValues[2], 16) + h3 = int(hexValues[3], 16) + except: + raise Exception("Invalid SchemaClassId format: bad hash string: '%s':" + % hstr) + + def getPackageName(self): + """ + Access the package name in the SchemaClassId. + + @rtype: str + """ + return self._pname + + + def getClassName(self): + """ + Access the class name in the SchemaClassId + + @rtype: str + """ + return self._cname + + + def getHashString(self): + """ + Access the schema's hash as a string value + + @rtype: str + """ + return self._hstr + + + def getType(self): + """ + Returns the type code associated with this Schema + + @rtype: str + """ + return self._type + + def mapEncode(self): + _map = {} + _map["package_name"] = self._pname + _map["class_name"] = self._cname + _map["type"] = self._type + if self._hstr: _map["hash_str"] = self._hstr + return _map + + def __repr__(self): + if self._type == SchemaTypeEvent: + stype = "event" + else: + stype = "data" + hstr = self.getHashString() + if not hstr: + hstr = _schemaHashStrDefault + return self._pname + ":" + self._cname + ":" + stype + "(" + hstr + ")" + + + def __cmp__(self, other): + if not isinstance(other, SchemaClassId) : + raise TypeError("Invalid types for compare") + # return 1 + me = str(self) + them = str(other) + if me < them: + return -1 + if me > them: + return 1 + return 0 + + + def __hash__(self): + return (self._pname, self._cname, self._hstr).__hash__() + + + +def SchemaClassIdFactory( param ): + """ + Factory for constructing SchemaClassIds from various sources + + @type param: various + @param param: object to use for constructing SchemaClassId + @rtype: SchemaClassId + @returns: a new SchemaClassId instance + """ + if type(param) == str: + # construct from __repr__/__str__ representation + try: + pname, cname, rest = param.split(":") + stype,rest = rest.split("("); + hstr = rest.rstrip(")"); + if hstr == _schemaHashStrDefault: + hstr = None + return SchemaClassId( pname, cname, stype, hstr ) + except: + raise TypeError("Invalid string format: '%s'" % param) + + if type(param) == dict: + # construct from map representation + if "package_name" in param: + pname = param["package_name"] + else: + raise TypeError("'package_name' attribute is required") + + if "class_name" in param: + cname = param["class_name"] + else: + raise TypeError("'class_name' attribute is required") + + hstr = None + if "hash_str" in param: + hstr = param["hash_str"] + + stype = "data" + if "type" in param: + stype = param["type"] + + return SchemaClassId( pname, cname, stype, hstr ) + + else: + raise TypeError("Invalid type for SchemaClassId construction") + + + +class SchemaProperty(object): + """ + Describes the structure of a Property data object. + Map format: + map["amqp_type"] = int, AMQP type code indicating property's data type + + optional: + map["access"] = str, access allowed to this property, default "RO" + map["index"] = bool, True if this property is an index value, default False + map["optional"] = bool, True if this property is optional, default False + map["unit"] = str, describes units used + map["min"] = int, minimum allowed value + map["max"] = int, maximun allowed value + map["maxlen"] = int, if string type, this is the maximum length in bytes + required to represent the longest instance of this string. + map["desc"] = str, human-readable description of this argument + map["reference"] = str, ??? + map["parent_ref"] = bool, true if this property references an object in + which this object is in a child-parent relationship. Default False + """ + __hash__ = None + _access_strings = ["RO","RW","RC"] + def __init__(self, typeCode, kwargs={}): + self._type = typeCode + self._access = "RO" + self._isIndex = False + self._isOptional = False + self._unit = None + self._min = None + self._max = None + self._maxlen = None + self._desc = None + self._reference = None + self._isParentRef = False + + for key, value in kwargs.items(): + if key == "access": + value = str(value).upper() + if value not in self._access_strings: + raise TypeError("invalid value for access parameter: '%s':" % value ) + self._access = value + elif key == "index" : self._isIndex = _toBool(value) + elif key == "optional": self._isOptional = _toBool(value) + elif key == "unit" : self._unit = value + elif key == "min" : self._min = value + elif key == "max" : self._max = value + elif key == "maxlen" : self._maxlen = value + elif key == "desc" : self._desc = value + elif key == "reference" : self._reference = value + elif key == "parent_ref" : self._isParentRef = _toBool(value) + + def getType(self): return self._type + + def getAccess(self): return self._access + + def isOptional(self): return self._isOptional + + def isIndex(self): return self._isIndex + + def getUnit(self): return self._unit + + def getMin(self): return self._min + + def getMax(self): return self._max + + def getMaxLen(self): return self._maxlen + + def getDesc(self): return self._desc + + def getReference(self): return self._reference + + def isParentRef(self): return self._isParentRef + + def mapEncode(self): + """ + Return the map encoding of this schema. + """ + _map = {} + _map["amqp_type"] = self._type + _map["access"] = self._access + _map["index"] = self._isIndex + _map["optional"] = self._isOptional + if self._unit: _map["unit"] = self._unit + if self._min: _map["min"] = self._min + if self._max: _map["max"] = self._max + if self._maxlen: _map["maxlen"] = self._maxlen + if self._desc: _map["desc"] = self._desc + if self._reference: _map["reference"] = self._reference + _map["parent_ref"] = self._isParentRef + return _map + + def __repr__(self): return str(self.mapEncode()) + + def _updateHash(self, hasher): + """ + Update the given hash object with a hash computed over this schema. + """ + hasher.update(str(self._type)) + hasher.update(str(self._isIndex)) + hasher.update(str(self._isOptional)) + if self._access: hasher.update(self._access) + if self._unit: hasher.update(self._unit) + if self._desc: hasher.update(self._desc) + + + +def SchemaPropertyFactory( _map ): + """ + Factory for constructing SchemaProperty from a map + + @type _map: dict + @param _map: from mapEncode() of SchemaProperty + @rtype: SchemaProperty + @returns: a new SchemaProperty instance + """ + if type(_map) == dict: + # construct from map + if "amqp_type" not in _map: + raise TypeError("requires 'amqp_type' value") + + return SchemaProperty( _map["amqp_type"], _map ) + + else: + raise TypeError("Invalid type for SchemaProperty construction") + + + + +class SchemaArgument(object): + """ + Describes the structure of an Argument to a Method call. + Map format: + map["amqp_type"] = int, type code indicating argument's data type + map["dir"] = str, direction for an argument associated with a + Method, "I"|"O"|"IO", default value: "I" + optional: + map["desc"] = str, human-readable description of this argument + map["default"] = by amqp_type, default value to use if none provided + """ + __hash__ = None + _dir_strings = ["I", "O", "IO"] + def __init__(self, typeCode, kwargs={}): + self._type = typeCode + self._dir = "I" + self._desc = None + self._default = None + + for key, value in kwargs.items(): + if key == "dir": + value = str(value).upper() + if value not in self._dir_strings: + raise TypeError("invalid value for dir parameter: '%s'" % value) + self._dir = value + elif key == "desc" : self._desc = value + elif key == "default" : self._default = value + + def getType(self): return self._type + + def getDirection(self): return self._dir + + def getDesc(self): return self._desc + + def getDefault(self): return self._default + + def mapEncode(self): + """ + Return the map encoding of this schema. + """ + _map = {} + _map["amqp_type"] = self._type + _map["dir"] = self._dir + # optional: + if self._default: _map["default"] = self._default + if self._desc: _map["desc"] = self._desc + return _map + + def __repr__(self): return str(self.mapEncode()) + + def _updateHash(self, hasher): + """ + Update the given hash object with a hash computed over this schema. + """ + hasher.update(str(self._type)) + hasher.update(self._dir) + if self._desc: hasher.update(self._desc) + + + +def SchemaArgumentFactory( param ): + """ + Factory for constructing SchemaArguments from various sources + + @type param: various + @param param: object to use for constructing SchemaArgument + @rtype: SchemaArgument + @returns: a new SchemaArgument instance + """ + if type(param) == dict: + # construct from map + if not "amqp_type" in param: + raise TypeError("requires 'amqp_type' value") + + return SchemaArgument( param["amqp_type"], param ) + + else: + raise TypeError("Invalid type for SchemaArgument construction") + + + +class SchemaMethod(object): + """ + The SchemaMethod class describes the method's structure, and contains a + SchemaProperty class for each argument declared by the method. + + Map format: + map["arguments"] = map of "name"= pairs. + map["desc"] = str, description of the method + """ + def __init__(self, args={}, _desc=None): + """ + Construct a SchemaMethod. + + @type args: map of "name"= objects + @param args: describes the arguments accepted by the method + @type _desc: str + @param _desc: Human-readable description of the schema + """ + self._arguments = args.copy() + self._desc = _desc + + def getDesc(self): return self._desc + + def getArgCount(self): return len(self._arguments) + + def getArguments(self): return self._arguments.copy() + + def getArgument(self, name): return self._arguments[name] + + def addArgument(self, name, schema): + """ + Add an argument to the list of arguments passed to this method. + Used by an agent for dynamically creating method schema. + + @type name: string + @param name: name of new argument + @type schema: SchemaProperty + @param schema: SchemaProperty to add to this method + """ + if not isinstance(schema, SchemaProperty): + raise TypeError("argument must be a SchemaProperty class") + self._arguments[name] = schema + + def mapEncode(self): + """ + Return the map encoding of this schema. + """ + _map = {} + _args = {} + for name,val in self._arguments.iteritems(): + _args[name] = val.mapEncode() + _map["arguments"] = _args + if self._desc: _map["desc"] = self._desc + return _map + + def __repr__(self): + result = "(" + first = True + for name,arg in self._arguments.iteritems(): + if arg._dir.find("I") != -1: + if first: + first = False + else: + result += ", " + result += name + result += ")" + return result + + def _updateHash(self, hasher): + """ + Update the given hash object with a hash computed over this schema. + """ + for name,val in self._arguments.iteritems(): + hasher.update(name) + val._updateHash(hasher) + if self._desc: hasher.update(self._desc) + + + +def SchemaMethodFactory( param ): + """ + Factory for constructing a SchemaMethod from various sources + + @type param: various + @param param: object to use for constructing a SchemaMethod + @rtype: SchemaMethod + @returns: a new SchemaMethod instance + """ + if type(param) == dict: + # construct from map + args = {} + desc = None + if "arguments" in param: + for name,val in param["arguments"].iteritems(): + args[name] = SchemaArgumentFactory(val) + if "desc" in param: + desc = param["desc"] + + return SchemaMethod( args, desc ) + + else: + raise TypeError("Invalid type for SchemaMethod construction") + + + +class SchemaClass(object): + """ + Base class for Data and Event Schema classes. + """ + def __init__(self, pname, cname, stype, desc=None, hstr=None): + """ + Schema Class constructor. + + @type pname: str + @param pname: package name + @type cname: str + @param cname: class name + @type stype: str + @param stype: type of schema, either "data" or "event" + @type desc: str + @param desc: Human-readable description of the schema + @type hstr: str + @param hstr: hash computed over the schema body, else None + """ + self._pname = pname + self._cname = cname + self._type = stype + self._desc = desc + if hstr: + self._classId = SchemaClassId( pname, cname, stype, hstr ) + else: + self._classId = None + self._properties = {} + self._methods = {} + self._pkeyNames = [] + + + def getClassId(self): + if not self._classId: + self.generateHash() + return self._classId + + def getDesc(self): return self._desc + + def generateHash(self): + """ + generate an md5 hash over the body of the schema, + and return a string representation of the hash + in format "%08x-%08x-%08x-%08x" + """ + md5Hash = _md5Obj() + md5Hash.update(self._pname) + md5Hash.update(self._cname) + md5Hash.update(self._type) + for name,x in self._properties.iteritems(): + md5Hash.update(name) + x._updateHash( md5Hash ) + for name,x in self._methods.iteritems(): + md5Hash.update(name) + x._updateHash( md5Hash ) + idx = 0 + for name in self._pkeyNames: + md5Hash.update(str(idx) + name) + idx += 1 + hstr = md5Hash.hexdigest()[0:8] + "-" +\ + md5Hash.hexdigest()[8:16] + "-" +\ + md5Hash.hexdigest()[16:24] + "-" +\ + md5Hash.hexdigest()[24:32] + # update classId with new hash value + self._classId = SchemaClassId( self._pname, + self._cname, + self._type, + hstr ) + return hstr + + + def mapEncode(self): + """ + Return the map encoding of this schema. + """ + _map = {} + _map["schema_id"] = self.getClassId().mapEncode() + _map["desc"] = self._desc + if len(self._properties): + _props = {} + for name,val in self._properties.iteritems(): + _props[name] = val.mapEncode() + _map["properties"] = _props + + if len(self._methods): + _meths = {} + for name,val in self._methods.iteritems(): + _meths[name] = val.mapEncode() + _map["methods"] = _meths + + if len(self._pkeyNames): + _map["primary_key"] = self._pkeyNames[:] + return _map + + def __repr__(self): + return str(self.getClassId()) + + + +class SchemaObjectClass(SchemaClass): + """ + A schema class that describes a data object. The data object is composed + of zero or more properties and methods. An instance of the SchemaObjectClass + can be identified using a key generated by concantenating the values of + all properties named in the primary key list. + + Map format: + map["schema_id"] = map, SchemaClassId map for this object. + map["desc"] = human readable description of this schema. + map["primary_key"] = ordered list of property names used to construct the Primary Key" + map["properties"] = map of "name":SchemaProperty instances. + map["methods"] = map of "name":SchemaMethods instances. + """ + def __init__( self, pname, cname, desc=None, _hash=None, + _props={}, _pkey=[], _methods={}): + """ + @type pname: str + @param pname: name of package this schema belongs to + @type cname: str + @param cname: class name for this schema + @type desc: str + @param desc: Human-readable description of the schema + @type _hash: str + @param _methods: hash computed on the body of this schema, if known + @type _props: map of 'name': objects + @param _props: all properties provided by this schema + @type _pkey: list of strings + @param _pkey: names of each property to be used for constructing the primary key + @type _methods: map of 'name': objects + @param _methods: all methods provided by this schema + """ + super(SchemaObjectClass, self).__init__(pname, + cname, + SchemaTypeData, + desc, + _hash) + self._properties = _props.copy() + self._pkeyNames = _pkey[:] + self._methods = _methods.copy() + + def getPrimaryKeyList(self): return self._pkeyNames[:] + + def getPropertyCount(self): return len(self._properties) + def getProperties(self): return self._properties.copy() + def getProperty(self, name): return self._properties[name] + + def getMethodCount(self): return len(self._methods) + def getMethods(self): return self._methods.copy() + def getMethod(self, name): return self._methods[name] + + def addProperty(self, name, prop): + self._properties[name] = prop + # need to re-generate schema hash + self._classId = None + + def addMethod(self, name, method): + self._methods[name] = method + self._classId = None + + + +def SchemaObjectClassFactory( param ): + """ + Factory for constructing a SchemaObjectClass from various sources. + + @type param: various + @param param: object to use for constructing a SchemaObjectClass instance + @rtype: SchemaObjectClass + @returns: a new SchemaObjectClass instance + """ + if type(param) == dict: + classId = None + properties = {} + methods = {} + pkey = [] + if "schema_id" in param: + classId = SchemaClassIdFactory(param["schema_id"]) + if (not classId) or (classId.getType() != SchemaTypeData): + raise TypeError("Invalid SchemaClassId specified: %s" % classId) + if "desc" in param: + desc = param["desc"] + if "primary_key" in param: + pkey = param["primary_key"] + if "properties" in param: + for name,val in param["properties"].iteritems(): + properties[name] = SchemaPropertyFactory(val) + if "methods" in param: + for name,val in param["methods"].iteritems(): + methods[name] = SchemaMethodFactory(val) + + return SchemaObjectClass( classId.getPackageName(), + classId.getClassName(), + desc, + _hash = classId.getHashString(), + _props = properties, _pkey = pkey, + _methods = methods) + + else: + raise TypeError("Invalid type for SchemaObjectClass construction") + + + +class SchemaEventClass(SchemaClass): + """ + A schema class that describes an event. The event is composed + of zero or more properties. + + Map format: + map["schema_id"] = map, SchemaClassId map for this object. + map["desc"] = string description of this schema + map["properties"] = map of "name":SchemaProperty values. + """ + def __init__( self, pname, cname, desc=None, _props={}, _hash=None ): + super(SchemaEventClass, self).__init__(pname, + cname, + SchemaTypeEvent, + desc, + _hash ) + self._properties = _props.copy() + + def getPropertyCount(self): return len(self._properties) + def getProperties(self): return self._properties.copy() + def getProperty(self, name): return self._properties[name] + def addProperty(self, name, prop): + self._properties[name] = prop + # need to re-generate schema hash + self._classId = None + + def mapEncode(self): + _map = super(SchemaEventClass, self).mapEncode() + return _map + + + +def SchemaEventClassFactory( param ): + """ + Factory for constructing a SchemaEventClass from various sources. + + @type param: various + @param param: object to use for constructing a SchemaEventClass instance + @rtype: SchemaEventClass + @returns: a new SchemaEventClass instance + """ + if type(param) == dict: + logging.debug( "constructing SchemaEventClass from map '%s'" % param ) + classId = None + properties = {} + if "schema_id" in param: + classId = SchemaClassIdFactory(param["schema_id"]) + if (not classId) or (classId.getType() != SchemaTypeEvent): + raise TypeError("Invalid SchemaClassId specified: %s" % classId) + if "desc" in param: + desc = param["desc"] + if "properties" in param: + for name,val in param["properties"].iteritems(): + properties[name] = SchemaPropertyFactory(val) + + return SchemaEventClass( classId.getPackageName(), + classId.getClassName(), + desc, + _hash = classId.getHashString(), + _props = properties ) + else: + raise TypeError("Invalid type for SchemaEventClass construction") + + + + + + + + + diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py new file mode 100644 index 0000000000..ec22bbbb50 --- /dev/null +++ b/qpid/python/qmf/qmfConsole.py @@ -0,0 +1,1129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import os +import logging +import platform +import time +import Queue +from threading import Thread +from threading import Lock +from threading import currentThread +from threading import Condition + +from qpid.messaging import * + +from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION, + AMQP_QMF_AGENT_LOCATE) +from qmfCommon import AgentId + + + +# global flag that indicates which thread (if any) is +# running the console callback +_callback_thread=None + + + + +##============================================================================== +## Sequence Manager +##============================================================================== + +class _Mailbox(object): + """ + Virtual base class for all Mailbox-like objects + """ + def __init__(self): + self._msgs = [] + self._cv = Condition() + self._waiting = False + + def deliver(self, obj): + self._cv.acquire() + try: + self._msgs.append(obj) + # if was empty, notify waiters + if len(self._msgs) == 1: + self._cv.notify() + finally: + self._cv.release() + + def fetch(self, timeout=None): + self._cv.acquire() + try: + if len(self._msgs): + return self._msgs.pop() + self._cv.wait(timeout) + if len(self._msgs): + return self._msgs.pop() + return None + finally: + self._cv.release() + + + +class SequencedWaiter(object): + """ + Manage sequence numbers for asynchronous method calls. + Allows the caller to associate a generic piece of data with a unique sequence + number.""" + + def __init__(self): + self.lock = Lock() + self.sequence = 1L + self.pending = {} + + + def allocate(self): + """ + Reserve a sequence number. + + @rtype: long + @return: a unique nonzero sequence number. + """ + self.lock.acquire() + try: + seq = self.sequence + self.sequence = self.sequence + 1 + self.pending[seq] = _Mailbox() + finally: + self.lock.release() + logging.debug( "sequence %d allocated" % seq) + return seq + + + def put_data(self, seq, new_data): + seq = long(seq) + logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) ) + self.lock.acquire() + try: + if seq in self.pending: + self.pending[seq].deliver(new_data) + else: + logging.error( "seq %d not found!" % seq ) + finally: + self.lock.release() + + + + def get_data(self, seq, timeout=None): + """ + Release a sequence number reserved using the reserve method. This must + be called when the sequence is no longer needed. + + @type seq: int + @param seq: a sequence previously allocated by calling reserve(). + @rtype: any + @return: the data originally associated with the reserved sequence number. + """ + seq = long(seq) + logging.debug( "getting data for seq=%d" % seq) + mbox = None + self.lock.acquire() + try: + if seq in self.pending: + mbox = self.pending[seq] + finally: + self.lock.release() + + # Note well: pending list is unlocked, so we can wait. + # we reference mbox locally, so it will not be released + # until we are done. + + if mbox: + d = mbox.fetch(timeout) + logging.debug( "seq %d fetched %r!" % (seq, d) ) + return d + + logging.debug( "seq %d not found!" % seq ) + return None + + + def release(self, seq): + """ + Release the sequence, and its mailbox + """ + seq = long(seq) + logging.debug( "releasing seq %d" % seq ) + self.lock.acquire() + try: + if seq in self.pending: + del self.pending[seq] + finally: + self.lock.release() + + + +#class ObjectProxy(QmfObject): +class ObjectProxy(object): + """ + A local representation of a QmfObject that is managed by a remote agent. + """ + def __init__(self, agent, cls, kwargs={}): + """ + @type agent: qmfConsole.AgentProxy + @param agent: Agent that manages this object. + @type cls: qmfCommon.SchemaObjectClass + @param cls: Schema that describes the class. + @type kwargs: dict + @param kwargs: ??? supported keys ??? + """ + # QmfObject.__init__(self, cls, kwargs) + self._agent = agent + + # def update(self): + def refresh(self, timeout = None): + """ + Called to re-fetch the current state of the object from the agent. This updates + the contents of the object to their most current values. + + @rtype: bool + @return: True if refresh succeeded. Refresh may fail if agent does not respond. + """ + if not self._agent: + raise Exception("No Agent associated with this object") + newer = self._agent.get_object(Query({"object_id":object_id}), timeout) + if newer == None: + logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) + raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) + self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class + + ### def _merge_update(self, newerObject): + ### ??? in Rafi's console.py::Object Class + + + ### def is_deleted(self): + ### ??? in Rafi's console.py::Object Class + + def key(self): pass + + + + +class AgentProxy(object): + """ + A local representation of a remote agent managed by this console. + """ + def __init__(self, name): + """ + @type name: AgentId + @param name: uniquely identifies this agent in the AMQP domain. + """ + if not name or not isinstance(name, AgentId): + raise Exception( "Attempt to create an Agent without supplying a valid agent name." ); + + self._name = name + self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(name) + self._console = None + self._sender = None + self._packages = [] # list of package names known to this agent + self._classes = {} # dict [key:class] of classes known to this agent + self._subscriptions = [] # list of active standing subscriptions for this agent + self._exists = False # true when Agent Announce is received from this agent + logging.debug( "Created AgentProxy with address: [%s]" % self._address ) + + + def key(self): + return str(self._name) + + + def _send_msg(self, msg): + """ + Low-level routine to asynchronously send a message to this agent. + """ + msg.reply_to = self._console.address() + handle = self._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a sequence id!") + msg.correlation_id = str(handle) + self._sender.send(msg) + return handle + + + + def _fetch_reply_msg(self, handle, timeout=None): + """ + Low-level routine to wait for an expected reply from the agent. + """ + if handle == 0: + raise Exception("Invalid handle") + msg = self._console._req_correlation.get_data( handle, timeout ) + if not msg: + logging.debug("timed out waiting for reply message") + self._console._req_correlation.release( handle ) + return msg + + + def get_packages(self): + """ + Return a list of the names of all packages known to this agent. + """ + return self._packages[:] + + def get_classes(self): + """ + Return a dictionary [key:class] of classes known to this agent. + """ + return self._classes[:] + + def get_objects(self, query, kwargs={}): + """ + Return a list of objects that satisfy the given query. + + @type query: dict, or qmfCommon.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: list + @return: list of matching objects, or None. + """ + pass + + def get_object(self, query, kwargs={}): + """ + Get one object - query is expected to match only one object. + ??? Recommended: explicit timeout param, default None ??? + + @type query: dict, or qmfCommon.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: qmfConsole.ObjectProxy + @return: one matching object, or none + """ + pass + + + def create_subscription(self, query): + """ + Factory for creating standing subscriptions based on a given query. + + @type query: qmfCommon.Query object + @param query: determines the list of objects for which this subscription applies + @rtype: qmfConsole.Subscription + @returns: an object representing the standing subscription. + """ + pass + + def __repr__(self): + return self._address + + def __str__(self): + return self.__repr__() + + + + ##============================================================================== + ## CONSOLE + ##============================================================================== + + + +class WorkItem(object): + """ + Describes an event that has arrived at the Console for the + application to process. The Notifier is invoked when one or + more of these WorkItems become available for processing. + """ + # + # Enumeration of the types of WorkItems produced by the Console + # + AGENT_ADDED = 1 + AGENT_DELETED = 2 + NEW_PACKAGE = 3 + NEW_CLASS = 4 + OBJECT_UPDATE = 5 + EVENT_RECEIVED = 7 + AGENT_HEARTBEAT = 8 + + def __init__(self, kind, kwargs={}): + """ + Used by the Console to create a work item. + + @type kind: int + @param kind: work item type + """ + self._kind = kind + self._param_map = kwargs + + + def getType(self): + return self._kind + + def getParams(self): + return self._param_map + + + +class Notifier(object): + """ + Virtual base class that defines a call back which alerts the application that + a QMF Console notification is pending. + """ + def console_indication(self): + """ + Called when one or more console items are ready for the console application to process. + This method may be called by the internal console management thread. Its purpose is to + indicate that the console application should process pending items. + """ + pass + + + + +class Console(Thread): + """ + A Console manages communications to a collection of agents on behalf of an application. + """ + def __init__(self, name=None, notifier=None, kwargs={}): + """ + @type name: str + @param name: identifier for this console. Must be unique. + @type notifier: qmfConsole.Notifier + @param notifier: invoked when events arrive for processing. + @type kwargs: dict + @param kwargs: ??? Unused + """ + Thread.__init__(self) + self._name = name + if not self._name: + self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) + self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name + self._notifier = notifier + self._conn = None + self._session = None + # dict of "agent-direct-address":AgentProxy entries + self._agent_map = {} + self._agent_map_lock = Lock() + self._direct_recvr = None + self._announce_recvr = None + self._locate_sender = None + self._schema_cache = {} + self._req_correlation = SequencedWaiter() + self._operational = False + self._agent_discovery = False + # lock out run() thread + self._cv = Condition() + # for passing WorkItems to the application + self._work_q = Queue.Queue() + ## Old stuff below??? + #self._broker_list = [] + #self.impl = qmfengine.Console() + #self._event = qmfengine.ConsoleEvent() + ##self._cv = Condition() + ##self._sync_count = 0 + ##self._sync_result = None + ##self._select = {} + ##self._cb_cond = Condition() + + + + def destroy(self, timeout=None): + """ + Must be called before the Console is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + logging.debug("Destroying Console...") + if self._conn: + self.remove_connection(self._conn, timeout) + logging.debug("Console Destroyed") + + + + def add_connection(self, conn): + """ + Add a AMQP connection to the console. The console will setup a session over the + connection. The console will then broadcast an Agent Locate Indication over + the session in order to discover present agents. + + @type conn: qpid.messaging.Connection + @param conn: the connection to the AMQP messaging infrastructure. + """ + if self._conn: + raise Exception( "Multiple connections per Console not supported." ); + self._conn = conn + self._session = conn.session(name=self._name) + self._direct_recvr = self._session.receiver(self._address) + self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION) + self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE) + # + # Now that receivers are created, fire off the receive thread... + # + self._operational = True + self.start() + + + + def remove_connection(self, conn, timeout=None): + """ + Remove an AMQP connection from the console. Un-does the add_connection() operation, + and releases any agents and sessions associated with the connection. + + @type conn: qpid.messaging.Connection + @param conn: connection previously added by add_connection() + """ + if self._conn and conn and conn != self._conn: + logging.error( "Attempt to delete unknown connection: %s" % str(conn)) + + # tell connection thread to shutdown + self._operational = False + if self.isAlive(): + # kick my thread to wake it up + logging.debug("Making temp sender for [%s]" % self._address) + tmp_sender = self._session.sender(self._address) + try: + msg = Message(subject="qmf4", + properties={"method":"request", + "opcode":"console-ping"}, + content={"data":"ignore"}) + tmp_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + logging.debug("waiting for console receiver thread to exit") + self.join(timeout) + if self.isAlive(): + logging.error( "Console thread '%s' is hung..." % self.getName() ) + self._direct_recvr.close() + self._announce_recvr.close() + self._locate_sender.close() + self._session = None + self._conn = None + logging.debug("console connection removal complete") + + + def address(self): + """ + The AMQP address this Console is listening to. + """ + return self._address + + + def create_agent( self, agent_name ): + """ + Factory to create/retrieve an agent for this console + """ + if not isinstance(agent_name, AgentId): + raise TypeError("agent_name must be an instance of AgentId") + + agent = AgentProxy(agent_name) + + self._agent_map_lock.acquire() + try: + if agent_name in self._agent_map: + return self._agent_map[agent_name] + + agent._console = self + agent._sender = self._session.sender(agent._address) + self._agent_map[agent_name] = agent + finally: + self._agent_map_lock.release() + + return agent + + + + def destroy_agent( self, agent ): + """ + Undoes create. + """ + if not isinstance(agent, AgentProxy): + raise TypeError("agent must be an instance of AgentProxy") + + self._agent_map_lock.acquire() + try: + if agent._name in self._agent_map: + del self._agent_map[agent._name] + finally: + self._agent_map_lock.release() + + + + + def find_agent(self, agent_name, timeout=None ): + """ + Given the name of a particular agent, return an AgentProxy representing + that agent. Return None if the agent does not exist. + """ + self._agent_map_lock.acquire() + try: + if agent_name in self._agent_map: + return self._agent_map[agent_name] + finally: + self._agent_map_lock.release() + + new_agent = self.create_agent(agent_name) + msg = Message(subject="qmf4", + properties={"method":"request", + "opcode":"agent-locate"}, + content={"query": {"vendor" : agent_name.vendor(), + "product" : agent_name.product(), + "name" : agent_name.name()}}) + handle = new_agent._send_msg(msg) + if handle == 0: + raise Exception("Failed to send Agent locate message to agent %s" % str(agent_name)) + + msg = new_agent._fetch_reply_msg(handle, timeout) + if not msg: + logging.debug("Unable to contact agent '%s' - no reply." % agent_name) + self.destroy_agent(new_agent) + return None + # @todo - for now, dump the message + logging.info( "agent-locate reply received for %s" % agent_name) + return new_agent + + + + def run(self): + global _callback_thread + # + # @todo KAG Rewrite when api supports waiting on multiple receivers + # + while self._operational: + + qLen = self._work_q.qsize() + + try: + msg = self._announce_recvr.fetch(timeout = 0) + if msg: + self._rcv_announce(msg) + except: + pass + + try: + msg = self._direct_recvr.fetch(timeout = 0) + if msg: + self._rcv_direct(msg) + except: + pass + + # try: + # logging.error("waiting for next rcvr...") + # rcvr = self._session.next_receiver() + # except: + # logging.error("exception during next_receiver()") + + # logging.error("rcvr=%s" % str(rcvr)) + + + if qLen == 0 and self._work_q.qsize() and self._notifier: + # work queue went non-empty, kick + # the application... + + _callback_thread = currentThread() + logging.info("Calling console indication") + self._notifier.console_indication() + _callback_thread = None + + while self._operational and \ + self._announce_recvr.pending() == 0 and \ + self._direct_recvr.pending(): + time.sleep(0.5) + + logging.debug("Shutting down Console thread") + + + + # called by run() thread ONLY + # + def _rcv_announce(self, msg): + """ + PRIVATE: Process a message received on the announce receiver + """ + logging.info( "Announce message received!" ) + if msg.subject != "qmf4": + logging.debug("Ignoring non-qmf message '%s'" % msg.subject) + return + + amap = {} + if msg.content_type == "amqp/map": + amap = msg.content + + if (not msg.properties or + not "method" in msg.properties or + not "opcode" in msg.properties): + logging.error("INVALID MESSAGE PROPERTIES: '%s'" % str(msg.properties)) + return + + if msg.properties["method"] == "indication": + # agent indication + if msg.properties["opcode"] == "agent": + if "name" in amap: + if self._agent_discovery: + ind = amap["name"] + if "vendor" in ind and "product" in ind and "name" in ind: + + agent = self.create_agent(AgentId( ind["vendor"], + ind["product"], + ind["name"] )) + if not agent._exists: + # new agent + agent._exists = True + logging.info("AGENT_ADDED for %s" % agent) + wi = WorkItem(WorkItem.AGENT_ADDED, + {"agent": agent}) + self._work_q.put(wi) + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % msg.properties["opcode"]) + else: + logging.warning("Ignoring message with unrecognized 'method' value: '%s'" + % msg.properties["method"] ) + + + + + # called by run() thread ONLY + # + def _rcv_direct(self, msg): + """ + PRIVATE: Process a message sent to my direct receiver + """ + logging.info( "direct message received!" ) + if msg.correlation_id: + self._req_correlation.put_data(msg.correlation_id, msg) + + + + def enable_agent_discovery(self): + """ + Called to enable the asynchronous Agent Discovery process. + Once enabled, AGENT_ADD work items can arrive on the WorkQueue. + """ + if not self._agent_discovery: + self._agent_discovery = True + msg = Message(subject="qmf4", + properties={"method":"request", + "opcode":"agent-locate"}, + content={"query": {"vendor": "*", + "product": "*", + "name": "*"}}) + self._locate_sender.send(msg) + + + + def disable_agent_discovery(self): + """ + Called to disable the async Agent Discovery process enabled by + calling enable_agent_discovery() + """ + self._agent_discovery = False + + + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + + + def get_next_workitem(self, timeout=None): + """ + Returns the next pending work item, or None if none available. + @todo: subclass and return an Empty event instead. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + + def release_workitem(self, wi): + """ + Return a WorkItem to the Console when it is no longer needed. + @todo: call Queue.task_done() - only 2.5+ + + @type wi: class qmfConsole.WorkItem + @param wi: work item object to return. + """ + pass + + + + # def get_packages(self): + # plist = [] + # for i in range(self.impl.packageCount()): + # plist.append(self.impl.getPackageName(i)) + # return plist + + + # def get_classes(self, package, kind=CLASS_OBJECT): + # clist = [] + # for i in range(self.impl.classCount(package)): + # key = self.impl.getClass(package, i) + # class_kind = self.impl.getClassKind(key) + # if class_kind == kind: + # if kind == CLASS_OBJECT: + # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) + # elif kind == CLASS_EVENT: + # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) + # return clist + + + # def bind_package(self, package): + # return self.impl.bindPackage(package) + + + # def bind_class(self, kwargs = {}): + # if "key" in kwargs: + # self.impl.bindClass(kwargs["key"]) + # elif "package" in kwargs: + # package = kwargs["package"] + # if "class" in kwargs: + # self.impl.bindClass(package, kwargs["class"]) + # else: + # self.impl.bindClass(package) + # else: + # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") + + + # def get_agents(self, broker=None): + # blist = [] + # if broker: + # blist.append(broker) + # else: + # self._cv.acquire() + # try: + # # copy while holding lock + # blist = self._broker_list[:] + # finally: + # self._cv.release() + + # agents = [] + # for b in blist: + # for idx in range(b.impl.agentCount()): + # agents.append(AgentProxy(b.impl.getAgent(idx), b)) + + # return agents + + + # def get_objects(self, query, kwargs = {}): + # timeout = 30 + # agent = None + # temp_args = kwargs.copy() + # if type(query) == type({}): + # temp_args.update(query) + + # if "_timeout" in temp_args: + # timeout = temp_args["_timeout"] + # temp_args.pop("_timeout") + + # if "_agent" in temp_args: + # agent = temp_args["_agent"] + # temp_args.pop("_agent") + + # if type(query) == type({}): + # query = Query(temp_args) + + # self._select = {} + # for k in temp_args.iterkeys(): + # if type(k) == str: + # self._select[k] = temp_args[k] + + # self._cv.acquire() + # try: + # self._sync_count = 1 + # self._sync_result = [] + # broker = self._broker_list[0] + # broker.send_query(query.impl, None, agent) + # self._cv.wait(timeout) + # if self._sync_count == 1: + # raise Exception("Timed out: waiting for query response") + # finally: + # self._cv.release() + + # return self._sync_result + + + # def get_object(self, query, kwargs = {}): + # ''' + # Return one and only one object or None. + # ''' + # objs = objects(query, kwargs) + # if len(objs) == 1: + # return objs[0] + # else: + # return None + + + # def first_object(self, query, kwargs = {}): + # ''' + # Return the first of potentially many objects. + # ''' + # objs = objects(query, kwargs) + # if objs: + # return objs[0] + # else: + # return None + + + # # Check the object against select to check for a match + # def _select_match(self, object): + # schema_props = object.properties() + # for key in self._select.iterkeys(): + # for prop in schema_props: + # if key == p[0].name() and self._select[key] != p[1]: + # return False + # return True + + + # def _get_result(self, list, context): + # ''' + # Called by Broker proxy to return the result of a query. + # ''' + # self._cv.acquire() + # try: + # for item in list: + # if self._select_match(item): + # self._sync_result.append(item) + # self._sync_count -= 1 + # self._cv.notify() + # finally: + # self._cv.release() + + + # def start_sync(self, query): pass + + + # def touch_sync(self, sync): pass + + + # def end_sync(self, sync): pass + + + + +# def start_console_events(self): +# self._cb_cond.acquire() +# try: +# self._cb_cond.notify() +# finally: +# self._cb_cond.release() + + +# def _do_console_events(self): +# ''' +# Called by the Console thread to poll for events. Passes the events +# onto the ConsoleHandler associated with this Console. Is called +# periodically, but can also be kicked by Console.start_console_events(). +# ''' +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# try: +# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: +# logging.debug("Console Event AGENT_ADDED received") +# if self._handler: +# self._handler.agent_added(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: +# logging.debug("Console Event AGENT_DELETED received") +# if self._handler: +# self._handler.agent_deleted(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: +# logging.debug("Console Event NEW_PACKAGE received") +# if self._handler: +# self._handler.new_package(self._event.name) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: +# logging.debug("Console Event NEW_CLASS received") +# if self._handler: +# self._handler.new_class(SchemaClassKey(self._event.classKey)) +# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: +# logging.debug("Console Event OBJECT_UPDATE received") +# if self._handler: +# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), +# self._event.hasProps, self._event.hasStats) +# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: +# logging.debug("Console Event EVENT_RECEIVED received") +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: +# logging.debug("Console Event AGENT_HEARTBEAT received") +# if self._handler: +# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) +# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: +# logging.debug("Console Event METHOD_RESPONSE received") +# else: +# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) +# except e: +# print "Exception caught in callback thread:", e +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) +# return count + + + + + +# class Broker(ConnectionHandler): +# # attr_reader :impl :conn, :console, :broker_bank +# def __init__(self, console, conn): +# self.broker_bank = 1 +# self.console = console +# self.conn = conn +# self._session = None +# self._cv = Condition() +# self._stable = None +# self._event = qmfengine.BrokerEvent() +# self._xmtMessage = qmfengine.Message() +# self.impl = qmfengine.BrokerProxy(self.console.impl) +# self.console.impl.addConnection(self.impl, self) +# self.conn.add_conn_handler(self) +# self._operational = True + + +# def shutdown(self): +# logging.debug("broker.shutdown() called.") +# self.console.impl.delConnection(self.impl) +# self.conn.del_conn_handler(self) +# if self._session: +# self.impl.sessionClosed() +# logging.debug("broker.shutdown() sessionClosed done.") +# self._session.destroy() +# logging.debug("broker.shutdown() session destroy done.") +# self._session = None +# self._operational = False +# logging.debug("broker.shutdown() done.") + + +# def wait_for_stable(self, timeout = None): +# self._cv.acquire() +# try: +# if self._stable: +# return +# if timeout: +# self._cv.wait(timeout) +# if not self._stable: +# raise Exception("Timed out: waiting for broker connection to become stable") +# else: +# while not self._stable: +# self._cv.wait() +# finally: +# self._cv.release() + + +# def send_query(self, query, ctx, agent): +# agent_impl = None +# if agent: +# agent_impl = agent.impl +# self.impl.sendQuery(query, ctx, agent_impl) +# self.conn.kick() + + +# def _do_broker_events(self): +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: +# logging.debug("Broker Event BROKER_INFO received"); +# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: +# logging.debug("Broker Event DECLARE_QUEUE received"); +# self.conn.impl.declareQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: +# logging.debug("Broker Event DELETE_QUEUE received"); +# self.conn.impl.deleteQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.BIND: +# logging.debug("Broker Event BIND received"); +# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.UNBIND: +# logging.debug("Broker Event UNBIND received"); +# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: +# logging.debug("Broker Event SETUP_COMPLETE received"); +# self.impl.startProtocol() +# elif self._event.kind == qmfengine.BrokerEvent.STABLE: +# logging.debug("Broker Event STABLE received"); +# self._cv.acquire() +# try: +# self._stable = True +# self._cv.notify() +# finally: +# self._cv.release() +# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: +# result = [] +# for idx in range(self._event.queryResponse.getObjectCount()): +# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) +# self.console._get_result(result, self._event.context) +# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: +# obj = self._event.context +# obj._method_result(MethodResponse(self._event.methodResponse())) + +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) + +# return count + + +# def _do_broker_messages(self): +# count = 0 +# valid = self.impl.getXmtMessage(self._xmtMessage) +# while valid: +# count += 1 +# logging.debug("Broker: sending msg on connection") +# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) +# self.impl.popXmt() +# valid = self.impl.getXmtMessage(self._xmtMessage) + +# return count + + +# def _do_events(self): +# while True: +# self.console.start_console_events() +# bcnt = self._do_broker_events() +# mcnt = self._do_broker_messages() +# if bcnt == 0 and mcnt == 0: +# break; + + +# def conn_event_connected(self): +# logging.debug("Broker: Connection event CONNECTED") +# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) +# self.impl.sessionOpened(self._session.handle) +# self._do_events() + + +# def conn_event_disconnected(self, error): +# logging.debug("Broker: Connection event DISCONNECTED") +# pass + + +# def conn_event_visit(self): +# self._do_events() + + +# def sess_event_session_closed(self, context, error): +# logging.debug("Broker: Session event CLOSED") +# self.impl.sessionClosed() + + +# def sess_event_recv(self, context, message): +# logging.debug("Broker: Session event MSG_RECV") +# if not self._operational: +# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) +# self.impl.handleRcvMessage(message) +# self._do_events() + + + diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py new file mode 100644 index 0000000000..ea7366945a --- /dev/null +++ b/qpid/python/qmf/test/agent_test.py @@ -0,0 +1,58 @@ +import logging +import time + +from qpid.messaging import * +from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty, + SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed, + QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, + QmfEvent, SchemaMethod) +from qmfAgent import (Agent, QmfAgentData) + + +class MyAgent(object): + def main(self): + + self._agent = Agent( "redhat.com", "qmf", "testAgent" ) + + # Dynamically construct a class schema + + _schema = SchemaObjectClass( "MyPackage", "MyClass", + desc="A test data schema", + _pkey=["index1", "index2"] ) + # add properties + _schema.addProperty( "index1", + SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.addProperty( "index2", + SchemaProperty(qmfTypes.TYPE_LSTR)) + + # add method + _meth = SchemaMethod( _desc="A test method" ) + _meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) ) + + _schema.addMethod( "meth_3", _meth ) + + # Add schema to Agent + + self._agent.registerObjectClass(_schema) + + # instantiate managed data objects matching the schema + + obj = QmfAgentData( self._agent, _schema ) + obj.setProperty("index1", 100) + obj.setProperty("index2", "a name" ) + + + self._agent.addObject( QmfAgentData( self._agent, _schema, + _props={"index1":99, + "index2": "another name"} )) + + + return None + + + + +app = MyAgent() +print( "s='%s'", str(app.main())) -- cgit v1.2.1