summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-12-10 20:32:21 +0000
committerTed Ross <tross@apache.org>2009-12-10 20:32:21 +0000
commitac3d1986b5673b3de060ec781d206e57b1edd53a (patch)
treeb996a131c743a863e63d5946c53c162aa60906b5
parent7e51d0a18d8ee7d4a19ab64ecb4b0d9dd03b1f73 (diff)
downloadqpid-python-ac3d1986b5673b3de060ec781d206e57b1edd53a.tar.gz
QPID-2261 - Patch from Ken Giusti
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@889416 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/Makefile1
-rw-r--r--qpid/python/qmf/qmfAgent.py211
-rw-r--r--qpid/python/qmf/qmfCommon.py1792
-rw-r--r--qpid/python/qmf/qmfConsole.py1129
-rw-r--r--qpid/python/qmf/test/agent_test.py58
5 files changed, 3191 insertions, 0 deletions
diff --git a/qpid/python/Makefile b/qpid/python/Makefile
index ff4a9af4f1..b5e48653d0 100644
--- a/qpid/python/Makefile
+++ b/qpid/python/Makefile
@@ -51,6 +51,7 @@ build: $(TARGETS)
doc:
@mkdir -p $(BUILD)
epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log
+ epydoc qmf.qmfCommon qmf.qmfConsole -o $(BUILD)/doc/qmf --no-private --no-sourcecode --include-log
install: build
install -d $(PYTHON_LIB)
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py
new file mode 100644
index 0000000000..59731a234b
--- /dev/null
+++ b/qpid/python/qmf/qmfAgent.py
@@ -0,0 +1,211 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import socket
+import os
+import logging
+from threading import Thread
+from qpid.messaging import Connection, Message
+from uuid import uuid4
+from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE,
+ AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged)
+
+
+
+ ##==============================================================================
+ ## AGENT
+ ##==============================================================================
+
+class Agent(Thread):
+ def __init__(self, vendor, product, name=None,
+ notifier=None, kwargs={}):
+ Thread.__init__(self)
+ self._running = False
+ self.vendor = vendor
+ self.product = product
+ if name:
+ self.name = name
+ else:
+ self.name = uuid4().get_urn().split(":")[2]
+ self._id = AgentId(self.vendor, self.product, self.name)
+ self._address = str(self._id)
+ self._notifier = notifier
+ self._conn = None
+
+ def getAgentId(self):
+ return AgentId(self.vendor, self.product, self.name)
+
+ def setConnection(self, conn):
+ self._conn = conn
+ self._session = self._conn.session()
+ self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE)
+ self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address)
+ self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION)
+ self._running = True
+ self._start()
+
+
+ def _dispatch(self, msg):
+ if msg.subject != "qmf4":
+ logging.debug("Ignoring non-qmf message '%s'" % msg.subject)
+ return
+
+ cmap = {}
+ if msg.content_type == "amqp/map":
+ cmap = msg.content
+
+ if (not msg.properties or
+ not "method" in msg.properties or
+ not "opcode" in msg.properties):
+ logging.error("INVALID MESSAGE PROPERTIES: '%s'" % str(msg.properties))
+ return
+
+ if msg.properties["method"] == "request":
+ if msg.properties["opcode"] == "agent-locate":
+ if "query" in cmap:
+ query = cmap["query"]
+ if ("vendor" in query and (query["vendor"] == "*" or query["vendor"] == self.vendor) and
+ "product" in query and (query["product"] == "*" or query["product"] == self.product) and
+ "name" in query and (query["name"] == "*" or query["name"] == self.name)):
+ logging.debug("Query received for %s:%s:%s" % (self.vendor, self.product, self.name))
+ logging.debug("reply-to [%s], cid=%s" % (msg.reply_to, msg.correlation_id))
+ try:
+ tmp_snd = self.session.sender( msg.reply_to )
+ m = Message( subject="qmf4",
+ properties={"method":"response",
+ "opcode":"agent"},
+ content={"name": {"vendor":"redhat.com",
+ "product":"agent",
+ "name":"tross"}},
+ correlation_id=msg.correlation_id)
+ tmp_snd.send(m)
+ logging.debug("reply-to [%s] sent" % msg.reply_to)
+ except e:
+ logging.error("Failed to send reply to msg '%s'" % str(e))
+
+ else:
+ logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
+ % msg.properties["opcode"])
+ else:
+ logging.warning("Ignoring message with unrecognized 'method' value: '%s'"
+ % msg.properties["method"] )
+
+
+
+ def run(self):
+ count = 0 # @todo: hack
+ while self._running:
+ try:
+ msg = self._locate_receiver.fetch(1)
+ if msg.content_type == "amqp/map":
+ self._dispatch(msg)
+ except KeyboardInterrupt:
+ break
+ except:
+ pass
+
+ try:
+ msg = self._direct_receiver.fetch(1)
+ if msg.content_type == "amqp/map":
+ self._dispatch(msg)
+ except KeyboardInterrupt:
+ break
+ except:
+ pass
+
+ count+= 1
+ if count == 5:
+ count = 0
+ m = Message( subject="qmf4",
+ properties={"method":"indication",
+ "opcode":"agent"},
+ content={"name": {"vendor":"redhat.com",
+ "product":"agent",
+ "name":"tross"}} )
+ self.ind_sender.send(m)
+ logging.info("Agent Indication Sent")
+
+
+ def registerObjectClass(self, cls):
+ logging.error("!!!Agent.registerObjectClass() TBD!!!")
+
+ def registerEventClass(self, cls):
+ logging.error("!!!Agent.registerEventClass() TBD!!!")
+
+ def raiseEvent(self, qmfEvent):
+ logging.error("!!!Agent.raiseEvent() TBD!!!")
+
+ def addObject(self, qmfAgentData ):
+ logging.error("!!!Agent.addObject() TBD!!!")
+
+ def methodResponse(self, context, status, text, arguments):
+ logging.error("!!!Agent.methodResponse() TBD!!!")
+
+ def getWorkItemCount(self):
+ """
+ Returns the count of pending WorkItems that can be retrieved.
+ """
+ logging.error("!!!Agent.getWorkItemCount() TBD!!!")
+
+ def getNextWorkItem(self, timeout=None):
+ """
+ Obtains the next pending work item, or None if none available.
+ """
+ logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+
+ def releaseWorkItem(self, wi):
+ """
+ Releases a WorkItem instance obtained by getNextWorkItem(). Called when
+ the application has finished processing the WorkItem.
+ """
+ logging.error("!!!Agent.releaseWorkItem() TBD!!!")
+
+
+
+
+ ##==============================================================================
+ ## OBJECTS
+ ##==============================================================================
+
+
+class QmfAgentData(QmfManaged):
+ """
+ A managed data object that is owned by an agent.
+ """
+ def __init__(self, _agent, _schema, _props={}):
+ """
+ @type _agent: class Agent
+ @param _agent: the agent that manages this object.
+ @type _schema: class SchemaObjectClass
+ @param _schema: the schema used to describe this data object
+ @type _props: map of "name"=<value> pairs
+ @param _props: initial values for all properties in this object
+ """
+ super(QmfAgentData, self).__init__(_agentId=_agent.getAgentId(),
+ _schema=_schema,
+ _props=_props)
+
+ def destroy(self):
+ self._timestamps[QmfManaged._ts_delete] = long(time.time() * 1000)
+ # @todo: publish change
+
+ def setProperty( self, _name, _value):
+ super(QmfAgentData, self).setProperty(_name, _value)
+ # @todo: publish change
diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py
new file mode 100644
index 0000000000..66162a18bb
--- /dev/null
+++ b/qpid/python/qmf/qmfCommon.py
@@ -0,0 +1,1792 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys
+import socket
+import os
+import time
+import logging
+from threading import Lock
+from threading import Condition
+try:
+ import hashlib
+ _md5Obj = hashlib.md5
+except ImportError:
+ import md5
+ _md5Obj = md5.new
+
+
+
+
+##
+## Constants
+##
+
+AMQP_QMF_TOPIC = "amq.topic"
+AMQP_QMF_DIRECT = "amq.direct"
+AMQP_QMF_NAME_SEPARATOR = "/"
+AMQP_QMF_AGENT_LOCATE = "amq.topic/agent.locate"
+AMQP_QMF_AGENT_INDICATION = "amq.topic/agent.ind"
+
+
+##==============================================================================
+## Agent Identification
+##==============================================================================
+
+class AgentId(object):
+ """
+ Uniquely identifies a management agent within the entire management domain.
+
+ Map format:
+ map["vendor"] = str, name of vendor of the agent
+ map["product"] = str, name of product using agent
+ map["name"] = str, name of agent, unique within vendor and product.
+ """
+ _separator = ":"
+ def __init__(self, vendor, product, name):
+ """
+ Note: this object must be immutable, as it is used to index into a dictionary
+ """
+ self._vendor = vendor
+ self._product = product
+ self._name = name
+
+ def vendor(self):
+ return self._vendor
+
+ def product(self):
+ return self._product
+
+ def name(self):
+ return self._name
+
+ def mapEncode(self):
+ _map = {}
+ _map["vendor"] = self._vendor
+ _map["product"] = self._product
+ _map["name"] = self._name
+ return _map
+
+ def __cmp__(self, other):
+ if not isinstance(other, AgentId) :
+ raise TypeError("Invalid types for compare")
+ # return 1
+ me = str(self)
+ them = str(other)
+
+ if me < them:
+ return -1
+ if me > them:
+ return 1
+ return 0
+
+ def __hash__(self):
+ return (self._vendor, self._product, self._name).__hash__()
+
+ def __repr__(self):
+ return self._vendor + AgentId._separator + \
+ self._product + AgentId._separator + \
+ self._name
+
+ def __str__(self):
+ return self.__repr__()
+
+
+
+def AgentIdFactory( param ):
+ """
+ Factory for constructing an AgentId class from various sources.
+
+ @type param: various
+ @param param: object to use for constructing AgentId
+ @rtype: AgentId
+ @returns: a new AgentId instance
+ """
+ if type(param) == str:
+ if param.count(AgentId._separator) < 2:
+ raise TypeError("AgentId string format must be 'vendor:product:name'")
+ vendor, product, name = param.split(AgentId._separator)
+ return AgentId(vendor, product, name)
+ if type(param) == dict:
+ # construct from map encoding
+ if not "vendor" in param:
+ raise TypeError("requires 'vendor' value")
+ if not "product" in param:
+ raise TypeError("requires 'product' value")
+ if not "name" in param:
+ raise TypeError("requires 'name' value")
+ return AgentId( param["vendor"], param["product"], param["name"] )
+ raise TypeError("Invalid type for AgentId construction")
+
+
+
+##==============================================================================
+## DATA MODEL
+##==============================================================================
+
+
+
+class ObjectId(object):
+ """
+ An instance of managed object must be uniquely identified within the
+ management system. Each managed object is given a key that is unique
+ within the domain of the object's managing Agent. Note that these
+ keys are not unique across Agents. Therefore, a globally unique name
+ for an instance of a managed object is the concatenation of the
+ object's key and the managing Agent's AgentId.
+
+ Map format:
+ map["agent_id"] = map representation of AgentId
+ map["primary_key"] = str, key for managed object
+ """
+ def __init__(self, agentid, name):
+ if not isinstance(agentid, AgentId):
+ raise TypeError("requires an AgentId class")
+ self._agentId = agentid;
+ self._name = name;
+
+ def getAgentId(self):
+ """
+ @rtype: class AgentId
+ @returns: Id of agent that manages the object.
+ """
+ return self._agentId
+
+ def getPrimaryKey(self):
+ """
+ @rtype: str
+ @returns: Key of managed object.
+ """
+ return self._name
+
+ def mapEncode(self):
+ _map = {}
+ _map["agent_id"] = self._agentId.mapEncode()
+ _map["primary_key"] = self._name
+ return _map
+
+ def __repr__(self):
+ return "%s:%s" % (self._agentId, self._name)
+
+ def __cmp__(self, other):
+ if not isinstance(other, ObjectId) :
+ raise TypeError("Invalid types for compare")
+
+ if self._agentId < other._agentId:
+ return -1
+ if self._agentId > other._agentId:
+ return 1
+ if self._name < other._name:
+ return -1
+ if self._name > other._name:
+ return 1
+ return 0
+
+ def __hash__(self):
+ return (hash(self._agentId), self._name).__hash__()
+
+
+def ObjectIdFactory( param ):
+ """
+ Factory for constructing ObjectIds from various sources
+
+ @type param: various
+ @param param: object to use for constructing ObjectId
+ @rtype: ObjectId
+ @returns: a new ObjectId instance
+ """
+ if type(param) == dict:
+ # construct from map
+ if "agent_id" not in param:
+ raise TypeError("requires 'agent_id' value")
+ if "primary_key" not in param:
+ raise TypeError("requires 'primary_key' value")
+
+ return ObjectId( AgentIdFactory(param["agent_id"]), param["primary_key"] )
+
+ else:
+ raise TypeError("Invalid type for ObjectId construction")
+
+
+
+
+class QmfData(object):
+ """
+ Base data class representing arbitrarily structure data. No schema or
+ managing agent is associated with data of this class.
+
+ Map format:
+ map["properties"] = map of unordered "name"=<value> pairs (optional)
+ """
+ def __init__(self, _props={}, _const=False):
+ """
+ @type _props: dict
+ @param _props: dictionary of initial name=value pairs for object's property data.
+ @type _const: boolean
+ @param _const: if true, this object cannot be modified
+ """
+ self._properties = _props.copy()
+ self._const = _const
+ self._managed = False # not managed by an Agent
+ self._described = False # not described by a schema
+
+ def isManaged(self):
+ return self._managed
+
+ def isDescribed(self):
+ return self._described
+
+ def getProperties(self):
+ return self._properties.copy()
+
+ def getProperty(self, _name):
+ return self._properties[_name]
+
+ def setProperty(self, _name, _value):
+ if self._const:
+ raise Exception("cannot modify constant data object")
+ self._properties[_name] = _value
+ return _value
+
+ def mapEncode(self):
+ return self._properties.copy()
+
+ def __repr__(self):
+ return str(self.mapEncode())
+
+ def __setattr__(self, _name, _value):
+ # ignore private data members
+ if _name[0] == '_':
+ return super.__setattr__(self, _name, _value)
+ # @todo: this is bad - what if the same name is used for a
+ # property and statistic or argument?
+ if _name in self._properties:
+ return self.setProperty(_name, _value)
+ return super.__setattr__(self, _name, _value)
+
+ def __getattr__(self, _name):
+ # @todo: this is bad - what if the same name is used for a
+ # property and statistic or argument?
+ if _name in self._properties: return self.getProperty(_name)
+ raise AttributeError("no item named '%s' in this object" % _name)
+
+ def __getitem__(self, _name):
+ return self.__getattr__(_name)
+
+ def __setitem__(self, _name, _value):
+ return self.__setattr__(_name, _value)
+
+
+def QmfDataFactory( param, const=False ):
+ """
+ Factory for constructing an QmfData class from various sources.
+
+ @type param: various
+ @param param: object to use for constructing QmfData instance
+ @rtype: QmfData
+ @returns: a new QmfData instance
+ """
+ if type(param) == dict:
+ # construct from map
+ return QmfData( _props=param, _const=const )
+
+ else:
+ raise TypeError("Invalid type for QmfData construction")
+
+
+
+class QmfDescribed(QmfData):
+ """
+ Data that has a formally defined structure is represented by the
+ QmfDescribed class. This class extends the QmfData class by
+ associating the data with a formal schema (SchemaObjectClass).
+
+ Map format:
+ map["schema_id"] = map representation of a SchemaClassId instance
+ map["properties"] = map representation of a QmfData instance
+ """
+ def __init__(self, _schema=None, _schemaId=None, _props={}, _const=False ):
+ """
+ @type _schema: class SchemaClass or derivative
+ @param _schema: an instance of the schema used to describe this object.
+ @type _schemaId: class SchemaClassId
+ @param _schemaId: if schema instance not available, this is mandatory.
+ @type _props: dict
+ @param _props: dictionary of initial name=value pairs for object's property data.
+ @type _const: boolean
+ @param _const: if true, this object cannot be modified
+ """
+ super(QmfDescribed, self).__init__(_props, _const)
+ self._validated = False
+ self._described = True
+ self._schema = _schema
+ if _schema:
+ self._schemaId = _schema.getClassId()
+ if self._const:
+ self._validate()
+ else:
+ if _schemaId:
+ self._schemaId = _schemaId
+ else:
+ raise Exception("A SchemaClass or SchemaClassId must be provided")
+
+
+ def getSchemaClassId(self):
+ """
+ @rtype: class SchemaClassId
+ @returns: the identifier of the Schema that describes the structure of the data.
+ """
+ return self._schemaId
+
+ def setSchema(self, _schema):
+ """
+ @type _schema: class SchemaClass or derivative
+ @param _schema: instance of schema used to describe the structure of the data.
+ """
+ if self._schemaId != _schema.getClassId():
+ raise Exception("Cannot reset current schema to a different schema")
+ oldSchema = self._schema
+ self._schema = _schema
+ if not oldSchema and self._const:
+ self._validate()
+
+ def getPrimaryKey(self):
+ """
+ Get a string composed of the object's primary key properties.
+ @rtype: str
+ @returns: a string composed from primary key property values.
+ """
+ if not self._schema:
+ raise Exception("schema not available")
+
+ if not self._validated:
+ self._validate()
+
+ if self._schema._pkeyNames == 0:
+ if len(self._properties) != 1:
+ raise Exception("no primary key defined")
+ return str(self._properties.values()[0])
+
+ result = u""
+ for pkey in self._schema._pkeyNames:
+ if result != u"":
+ result += u":"
+ try:
+ valstr = unicode(self._properties[pkey])
+ except:
+ valstr = u"<undecodable>"
+ result += valstr
+ return result
+
+
+ def mapEncode(self):
+ _map = {}
+ _map["schema_id"] = self._schemaId.mapEncode()
+ _map["properties"] = super(QmfDescribed, self).mapEncode()
+ return _map
+
+ def _validate(self):
+ """
+ Compares this object's data against the associated schema. Throws an
+ exception if the data does not conform to the schema.
+ """
+ for name,val in self._schema._properties.iteritems():
+ # @todo validate: type compatible with amqp_type?
+ # @todo validate: primary keys have values
+ if name not in self._properties:
+ if val._isOptional:
+ # ok not to be present, put in dummy value
+ # to simplify access
+ self._properties[name] = None
+ else:
+ raise Exception("Required property '%s' not present." % name)
+ self._validated = True
+
+
+
+def QmfDescribedFactory( param, _schema=None ):
+ """
+ Factory for constructing an QmfDescribed class from various sources.
+
+ @type param: various
+ @param param: object to use for constructing QmfDescribed instance
+ @type _schema: SchemaClass
+ @param _schema: instance of the SchemaClass that describes this instance
+ @rtype: QmfDescribed
+ @returns: a new QmfDescribed instance
+ """
+ if type(param) == dict:
+ # construct from map
+ if "schema_id" not in param:
+ raise TypeError("requires 'schema_id' value")
+ if "properties" not in param:
+ raise TypeError("requires 'properties' value")
+
+ return QmfDescribed( _schema=_schema, _schemaId=SchemaClassIdFactory( param["schema_id"] ),
+ _props = param["properties"] )
+ else:
+ raise TypeError("Invalid type for QmfDescribed construction")
+
+
+
+class QmfManaged(QmfDescribed):
+ """
+ Data that has a formally defined structure, and for which each
+ instance of the data is managed by a particular Agent is represented
+ by the QmfManaged class. This class extends the QmfDescribed class by
+ associating the described data with a particular Agent.
+
+ Map format:
+ map["object_id"] = map representation of an ObjectId value
+ map["schema_id"] = map representation of a SchemaClassId instance
+ map["qmf_data"] = map representation of a QmfData instance
+ map["timestamps"] = array of AMQP timestamps. [0] = time of last update,
+ [1] = creation timestamp, [2] = deletion timestamp or zero.
+ """
+ _ts_update = 0
+ _ts_create = 1
+ _ts_delete = 2
+ def __init__( self, _agentId=None, _schema=None, _schemaId=None,
+ _props={}, _const=False ):
+ """
+ @type _agentId: class AgentId
+ @param _agentId: globally unique identifier of the managing agent.
+ @type _schema: class SchemaClass or derivative
+ @param _schema: an instance of the schema used to describe this object.
+ @type _schemaId: class SchemaClassId
+ @param _schemaId: if schema instance not available, this is mandatory.
+ @type _props: dict
+ @param _props: dictionary of initial name=value pairs for object's property data.
+ @type _const: boolean
+ @param _const: if true, this object cannot be modified
+ """
+ super(QmfManaged, self).__init__(_schema=_schema, _schemaId=_schemaId,
+ _props=_props, _const=_const)
+ self._managed = True
+ self._agentId = _agentId
+ # timestamp, in millisec since epoch UTC
+ _ctime = long(time.time() * 1000)
+ self._timestamps = [_ctime,_ctime,0]
+
+
+ def getObjectId(self):
+ """
+ @rtype: class ObjectId
+ @returns: the ObjectId that uniquely identifies this managed object.
+ """
+ return ObjectId(self._agentId, self.getPrimaryKey())
+
+ def isDeleted(self):
+ return self._timestamps[QmfManaged._ts_delete] == 0
+
+ def mapEncode(self):
+ _map = super(QmfManaged, self).mapEncode()
+ _map["agent_id"] = self._agentId.mapEncode()
+ _map["timestamps"] = self._timestamps[:]
+ return _map
+
+
+
+def QmfManagedFactory( param, _schema=None ):
+ """
+ Factory for constructing an QmfManaged instance from various sources.
+
+ @type param: various
+ @param param: object to use for constructing QmfManaged instance
+ @type _schema: SchemaClass
+ @param _schema: instance of the SchemaClass that describes this instance
+ @rtype: QmfManaged
+ @returns: a new QmfManaged instance
+ """
+ if type(param) == dict:
+ # construct from map
+ if "agent_id" not in param:
+ raise TypeError("requires 'agent_id' value")
+ _qd = QmfDescribedFactory( param, _schema )
+
+ return QmfManaged( _agentId = AgentIdFactory(param["agent_id"]),
+ _schema=_schema, _schemaId=_qd._schemaId,
+ _props = _qd._properties )
+ else:
+ raise TypeError("Invalid type for QmfManaged construction")
+
+
+
+class QmfEvent(QmfDescribed):
+ """
+ A QMF Event is a type of described data that is not managed. Events are
+ notifications that are sent by Agents. An event notifies a Console of a
+ change in some aspect of the system under managment.
+ """
+ def __init__(self, _map=None,
+ _timestamp=None, _agentId=None,
+ _schema=None, _schemaId=None,
+ _props={}, _const=False):
+ """
+ @type _map: dict
+ @param _map: if not None, construct instance from map representation.
+ @type _timestamp: int
+ @param _timestamp: moment in time when event occurred, expressed
+ as milliseconds since Midnight, Jan 1, 1970 UTC.
+ @type _agentId: class AgentId
+ @param _agentId: Identifies agent issuing this event.
+ @type _schema: class Schema
+ @param _schema:
+ @type _schemaId: class SchemaClassId (event)
+ @param _schemaId: identi
+ """
+ if _map:
+ if type(_map) != dict:
+ raise TypeError("parameter '_map' must be of type 'dict'")
+ if "timestamp" not in _map:
+ pass
+ if "agent_id" not in _map:
+ pass
+ _qe = QmfDescribedFactory( _map, _schema )
+ super(QmfEvent, self).__init__( _schema=_qe._schema, _schemaId=_qe._schemaId,
+ _props=_qe._properties, _const=_qe._const )
+ self._timestamp = long(_map["timestamp"])
+ self._agentId = AgentIdFactory(_map["agent_id"])
+ else:
+ super(QmfEvent, self).__init__(_schema=_schema, _schemaId=_schemaId,
+ _props=_props, _const=_const)
+ self._timestamp = long(_timestamp)
+ self._agentId = _agentId
+
+ def getTimestamp(self): return self._timestamp
+
+ def getAgentId(self): return self._agentId
+
+ def mapEncode(self):
+ _map = super(QmfEvent, self).mapEncode()
+ _map["timestamp"] = self._timestamp
+ _map["agent_id"] = self._agentId.mapEncode()
+ return _map
+
+
+
+#==============================================================================
+#==============================================================================
+#==============================================================================
+
+
+
+class QmfObject(object):
+ # attr_reader :impl, :object_class
+ def __init__(self, cls, kwargs={}):
+ pass
+# self._cv = Condition()
+# self._sync_count = 0
+# self._sync_result = None
+# self._allow_sets = False
+# if kwargs.has_key("broker"):
+# self._broker = kwargs["broker"]
+# else:
+# self._broker = None
+# if cls:
+# self.object_class = cls
+# self.impl = qmfengine.Object(self.object_class.impl)
+# elif kwargs.has_key("impl"):
+# self.impl = qmfengine.Object(kwargs["impl"])
+# self.object_class = SchemaObjectClass(None,
+# None,
+# {"impl":self.impl.getClass()})
+# else:
+# raise Exception("Argument error: required parameter ('impl') not supplied")
+
+
+# def destroy(self):
+# self.impl.destroy()
+
+
+# def object_id(self):
+# return ObjectId(self.impl.getObjectId())
+
+
+# def set_object_id(self, oid):
+# self.impl.setObjectId(oid.impl)
+
+
+# def properties(self):
+# list = []
+# for prop in self.object_class.properties:
+# list.append([prop, self.get_attr(prop.name())])
+# return list
+
+
+# def statistics(self):
+# list = []
+# for stat in self.object_class.statistics:
+# list.append([stat, self.get_attr(stat.name())])
+# return list
+
+
+# def get_attr(self, name):
+# val = self._value(name)
+# vType = val.getType()
+# if vType == TYPE_UINT8: return val.asUint()
+# elif vType == TYPE_UINT16: return val.asUint()
+# elif vType == TYPE_UINT32: return val.asUint()
+# elif vType == TYPE_UINT64: return val.asUint64()
+# elif vType == TYPE_SSTR: return val.asString()
+# elif vType == TYPE_LSTR: return val.asString()
+# elif vType == TYPE_ABSTIME: return val.asInt64()
+# elif vType == TYPE_DELTATIME: return val.asUint64()
+# elif vType == TYPE_REF: return ObjectId(val.asObjectId())
+# elif vType == TYPE_BOOL: return val.asBool()
+# elif vType == TYPE_FLOAT: return val.asFloat()
+# elif vType == TYPE_DOUBLE: return val.asDouble()
+# elif vType == TYPE_UUID: return val.asUuid()
+# elif vType == TYPE_INT8: return val.asInt()
+# elif vType == TYPE_INT16: return val.asInt()
+# elif vType == TYPE_INT32: return val.asInt()
+# elif vType == TYPE_INT64: return val.asInt64()
+# else:
+# # when TYPE_MAP
+# # when TYPE_OBJECT
+# # when TYPE_LIST
+# # when TYPE_ARRAY
+# logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) )
+# return None
+
+
+# def set_attr(self, name, v):
+# val = self._value(name)
+# vType = val.getType()
+# if vType == TYPE_UINT8: return val.setUint(v)
+# elif vType == TYPE_UINT16: return val.setUint(v)
+# elif vType == TYPE_UINT32: return val.setUint(v)
+# elif vType == TYPE_UINT64: return val.setUint64(v)
+# elif vType == TYPE_SSTR:
+# if v: return val.setString(v)
+# else: return val.setString('')
+# elif vType == TYPE_LSTR:
+# if v: return val.setString(v)
+# else: return val.setString('')
+# elif vType == TYPE_ABSTIME: return val.setInt64(v)
+# elif vType == TYPE_DELTATIME: return val.setUint64(v)
+# elif vType == TYPE_REF: return val.setObjectId(v.impl)
+# elif vType == TYPE_BOOL: return val.setBool(v)
+# elif vType == TYPE_FLOAT: return val.setFloat(v)
+# elif vType == TYPE_DOUBLE: return val.setDouble(v)
+# elif vType == TYPE_UUID: return val.setUuid(v)
+# elif vType == TYPE_INT8: return val.setInt(v)
+# elif vType == TYPE_INT16: return val.setInt(v)
+# elif vType == TYPE_INT32: return val.setInt(v)
+# elif vType == TYPE_INT64: return val.setInt64(v)
+# else:
+# # when TYPE_MAP
+# # when TYPE_OBJECT
+# # when TYPE_LIST
+# # when TYPE_ARRAY
+# logging.error("Unsupported type for get_attr? '%s'" % str(val.getType()))
+# return None
+
+
+# def __getitem__(self, name):
+# return self.get_attr(name)
+
+
+# def __setitem__(self, name, value):
+# self.set_attr(name, value)
+
+
+# def inc_attr(self, name, by=1):
+# self.set_attr(name, self.get_attr(name) + by)
+
+
+# def dec_attr(self, name, by=1):
+# self.set_attr(name, self.get_attr(name) - by)
+
+
+
+
+# def _invokeMethod(self, name, argMap):
+# """
+# Private: Helper function that invokes an object's method, and waits for the result.
+# """
+# self._cv.acquire()
+# try:
+# timeout = 30
+# self._sync_count = 1
+# self.impl.invokeMethod(name, argMap, self)
+# if self._broker:
+# self._broker.conn.kick()
+# self._cv.wait(timeout)
+# if self._sync_count == 1:
+# raise Exception("Timed out: waiting for response to method call.")
+# finally:
+# self._cv.release()
+
+# return self._sync_result
+
+
+# def _method_result(self, result):
+# """
+# Called to return the result of a method call on an object
+# """
+# self._cv.acquire();
+# try:
+# self._sync_result = result
+# self._sync_count -= 1
+# self._cv.notify()
+# finally:
+# self._cv.release()
+
+
+# def _marshall(schema, args):
+# '''
+# Private: Convert a list of arguments (positional) into a Value object of type "map".
+# Used to create the argument parameter for an object's method invokation.
+# '''
+# # Build a map of the method's arguments
+# map = qmfengine.Value(TYPE_MAP)
+# for arg in schema.arguments:
+# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
+# map.insert(arg.name, qmfengine.Value(arg.typecode))
+
+# # install each argument's value into the map
+# marshalled = Arguments(map)
+# idx = 0
+# for arg in schema.arguments:
+# if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
+# if args[idx]:
+# marshalled[arg.name] = args[idx]
+# idx += 1
+
+# return marshalled.map
+
+
+# def _value(self, name):
+# val = self.impl.getValue(name)
+# if not val:
+# raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" %
+# (name,
+# self.object_class.impl.getClassKey().getPackageName(),
+# self.object_class.impl.getClassKey().getClassName()))
+# return val
+
+
+
+
+
+class Arguments(object):
+ def __init__(self, map):
+ pass
+# self.map = map
+# self._by_hash = {}
+# key_count = self.map.keyCount()
+# a = 0
+# while a < key_count:
+# self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a))
+# a += 1
+
+
+# def __getitem__(self, key):
+# return self._by_hash[key]
+
+
+# def __setitem__(self, key, value):
+# self._by_hash[key] = value
+# self.set(key, value)
+
+
+# def __iter__(self):
+# return self._by_hash.__iter__
+
+
+# def __getattr__(self, name):
+# if name in self._by_hash:
+# return self._by_hash[name]
+# return super.__getattr__(self, name)
+
+
+# def __setattr__(self, name, value):
+# #
+# # ignore local data members
+# #
+# if (name[0] == '_' or
+# name == 'map'):
+# return super.__setattr__(self, name, value)
+
+# if name in self._by_hash:
+# self._by_hash[name] = value
+# return self.set(name, value)
+
+# return super.__setattr__(self, name, value)
+
+
+# def by_key(self, key):
+# val = self.map.byKey(key)
+# vType = val.getType()
+# if vType == TYPE_UINT8: return val.asUint()
+# elif vType == TYPE_UINT16: return val.asUint()
+# elif vType == TYPE_UINT32: return val.asUint()
+# elif vType == TYPE_UINT64: return val.asUint64()
+# elif vType == TYPE_SSTR: return val.asString()
+# elif vType == TYPE_LSTR: return val.asString()
+# elif vType == TYPE_ABSTIME: return val.asInt64()
+# elif vType == TYPE_DELTATIME: return val.asUint64()
+# elif vType == TYPE_REF: return ObjectId(val.asObjectId())
+# elif vType == TYPE_BOOL: return val.asBool()
+# elif vType == TYPE_FLOAT: return val.asFloat()
+# elif vType == TYPE_DOUBLE: return val.asDouble()
+# elif vType == TYPE_UUID: return val.asUuid()
+# elif vType == TYPE_INT8: return val.asInt()
+# elif vType == TYPE_INT16: return val.asInt()
+# elif vType == TYPE_INT32: return val.asInt()
+# elif vType == TYPE_INT64: return val.asInt64()
+# else:
+# # when TYPE_MAP
+# # when TYPE_OBJECT
+# # when TYPE_LIST
+# # when TYPE_ARRAY
+# logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
+# return None
+
+
+# def set(self, key, value):
+# val = self.map.byKey(key)
+# vType = val.getType()
+# if vType == TYPE_UINT8: return val.setUint(value)
+# elif vType == TYPE_UINT16: return val.setUint(value)
+# elif vType == TYPE_UINT32: return val.setUint(value)
+# elif vType == TYPE_UINT64: return val.setUint64(value)
+# elif vType == TYPE_SSTR:
+# if value:
+# return val.setString(value)
+# else:
+# return val.setString('')
+# elif vType == TYPE_LSTR:
+# if value:
+# return val.setString(value)
+# else:
+# return val.setString('')
+# elif vType == TYPE_ABSTIME: return val.setInt64(value)
+# elif vType == TYPE_DELTATIME: return val.setUint64(value)
+# elif vType == TYPE_REF: return val.setObjectId(value.impl)
+# elif vType == TYPE_BOOL: return val.setBool(value)
+# elif vType == TYPE_FLOAT: return val.setFloat(value)
+# elif vType == TYPE_DOUBLE: return val.setDouble(value)
+# elif vType == TYPE_UUID: return val.setUuid(value)
+# elif vType == TYPE_INT8: return val.setInt(value)
+# elif vType == TYPE_INT16: return val.setInt(value)
+# elif vType == TYPE_INT32: return val.setInt(value)
+# elif vType == TYPE_INT64: return val.setInt64(value)
+# else:
+# # when TYPE_MAP
+# # when TYPE_OBJECT
+# # when TYPE_LIST
+# # when TYPE_ARRAY
+# logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
+# return None
+
+
+
+class MethodResponse(object):
+ def __init__(self, impl):
+ pass
+# self.impl = qmfengine.MethodResponse(impl)
+
+
+# def status(self):
+# return self.impl.getStatus()
+
+
+# def exception(self):
+# return self.impl.getException()
+
+
+# def text(self):
+# return exception().asString()
+
+
+# def args(self):
+# return Arguments(self.impl.getArgs())
+
+
+# def __getattr__(self, name):
+# myArgs = self.args()
+# return myArgs.__getattr__(name)
+
+
+# def __setattr__(self, name, value):
+# if name == 'impl':
+# return super.__setattr__(self, name, value)
+
+# myArgs = self.args()
+# return myArgs.__setattr__(name, value)
+
+
+
+# ##==============================================================================
+# ## QUERY
+# ##==============================================================================
+
+
+class Query:
+ def __init__(self, kwargs={}):
+ pass
+# if "impl" in kwargs:
+# self.impl = kwargs["impl"]
+# else:
+# package = ''
+# if "key" in kwargs:
+# # construct using SchemaClassKey:
+# self.impl = qmfengine.Query(kwargs["key"])
+# elif "object_id" in kwargs:
+# self.impl = qmfengine.Query(kwargs["object_id"].impl)
+# else:
+# if "package" in kwargs:
+# package = kwargs["package"]
+# if "class" in kwargs:
+# self.impl = qmfengine.Query(kwargs["class"], package)
+# else:
+# raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']")
+
+
+# def package_name(self): return self.impl.getPackage()
+# def class_name(self): return self.impl.getClass()
+# def object_id(self):
+# _objid = self.impl.getObjectId()
+# if _objid:
+# return ObjectId(_objid)
+# else:
+# return None
+
+
+##==============================================================================
+## SCHEMA
+##==============================================================================
+
+# known schema types
+
+SchemaTypeData = "data"
+SchemaTypeEvent = "event"
+
+# format convention for schema hash
+
+_schemaHashStrFormat = "%08x-%08x-%08x-%08x"
+_schemaHashStrDefault = "00000000-00000000-00000000-00000000"
+
+# Argument typecodes, access, and direction qualifiers
+
+class qmfTypes(object):
+ TYPE_UINT8 = 1
+ TYPE_UINT16 = 2
+ TYPE_UINT32 = 3
+ TYPE_UINT64 = 4
+ TYPE_SSTR = 6
+ TYPE_LSTR = 7
+ TYPE_ABSTIME = 8
+ TYPE_DELTATIME = 9
+ TYPE_REF = 10
+ TYPE_BOOL = 11
+ TYPE_FLOAT = 12
+ TYPE_DOUBLE = 13
+ TYPE_UUID = 14
+ TYPE_MAP = 15
+ TYPE_INT8 = 16
+ TYPE_INT16 = 17
+ TYPE_INT32 = 18
+ TYPE_INT64 = 19
+ TYPE_OBJECT = 20
+ TYPE_LIST = 21
+ TYPE_ARRAY = 22
+
+
+class qmfAccess(object):
+ READ_CREATE = 1
+ READ_WRITE = 2
+ READ_ONLY = 3
+
+
+class qmfDirection(object):
+ DIR_IN = 1
+ DIR_OUT = 2
+ DIR_IN_OUT = 3
+
+
+
+def _toBool( param ):
+ """
+ Helper routine to convert human-readable representations of
+ boolean values to python bool types.
+ """
+ _false_strings = ["off", "no", "false", "0", "none"]
+ _true_strings = ["on", "yes", "true", "1"]
+ if type(param) == str:
+ lparam = param.lower()
+ if lparam in _false_strings:
+ return False
+ if lparam in _true_strings:
+ return True
+ raise TypeError("unrecognized boolean string: '%s'" % param )
+ else:
+ return bool(param)
+
+
+
+class SchemaClassId(object):
+ """
+ Unique identifier for an instance of a SchemaClass.
+
+ Map format:
+ map["package_name"] = str, name of associated package
+ map["class_name"] = str, name of associated class
+ map["type"] = str, "data"|"event", default: "data"
+ optional:
+ map["hash_str"] = str, hash value in standard format or None
+ if hash is unknown.
+ """
+ def __init__(self, pname, cname, stype=SchemaTypeData, hstr=None):
+ """
+ @type pname: str
+ @param pname: the name of the class's package
+ @type cname: str
+ @param cname: name of the class
+ @type stype: str
+ @param stype: schema type [SchemaTypeData | SchemaTypeEvent]
+ @type hstr: str
+ @param hstr: the hash value in '%08x-%08x-%08x-%08x' format
+ """
+ self._pname = pname
+ self._cname = cname
+ if stype != SchemaTypeData and stype != SchemaTypeEvent:
+ raise TypeError("Invalid SchemaClassId type: '%s'" % stype)
+ self._type = stype
+ self._hstr = hstr
+ if self._hstr:
+ try:
+ # sanity check the format of the hash string
+ hexValues = hstr.split("-")
+ h0 = int(hexValues[0], 16)
+ h1 = int(hexValues[1], 16)
+ h2 = int(hexValues[2], 16)
+ h3 = int(hexValues[3], 16)
+ except:
+ raise Exception("Invalid SchemaClassId format: bad hash string: '%s':"
+ % hstr)
+
+ def getPackageName(self):
+ """
+ Access the package name in the SchemaClassId.
+
+ @rtype: str
+ """
+ return self._pname
+
+
+ def getClassName(self):
+ """
+ Access the class name in the SchemaClassId
+
+ @rtype: str
+ """
+ return self._cname
+
+
+ def getHashString(self):
+ """
+ Access the schema's hash as a string value
+
+ @rtype: str
+ """
+ return self._hstr
+
+
+ def getType(self):
+ """
+ Returns the type code associated with this Schema
+
+ @rtype: str
+ """
+ return self._type
+
+ def mapEncode(self):
+ _map = {}
+ _map["package_name"] = self._pname
+ _map["class_name"] = self._cname
+ _map["type"] = self._type
+ if self._hstr: _map["hash_str"] = self._hstr
+ return _map
+
+ def __repr__(self):
+ if self._type == SchemaTypeEvent:
+ stype = "event"
+ else:
+ stype = "data"
+ hstr = self.getHashString()
+ if not hstr:
+ hstr = _schemaHashStrDefault
+ return self._pname + ":" + self._cname + ":" + stype + "(" + hstr + ")"
+
+
+ def __cmp__(self, other):
+ if not isinstance(other, SchemaClassId) :
+ raise TypeError("Invalid types for compare")
+ # return 1
+ me = str(self)
+ them = str(other)
+ if me < them:
+ return -1
+ if me > them:
+ return 1
+ return 0
+
+
+ def __hash__(self):
+ return (self._pname, self._cname, self._hstr).__hash__()
+
+
+
+def SchemaClassIdFactory( param ):
+ """
+ Factory for constructing SchemaClassIds from various sources
+
+ @type param: various
+ @param param: object to use for constructing SchemaClassId
+ @rtype: SchemaClassId
+ @returns: a new SchemaClassId instance
+ """
+ if type(param) == str:
+ # construct from __repr__/__str__ representation
+ try:
+ pname, cname, rest = param.split(":")
+ stype,rest = rest.split("(");
+ hstr = rest.rstrip(")");
+ if hstr == _schemaHashStrDefault:
+ hstr = None
+ return SchemaClassId( pname, cname, stype, hstr )
+ except:
+ raise TypeError("Invalid string format: '%s'" % param)
+
+ if type(param) == dict:
+ # construct from map representation
+ if "package_name" in param:
+ pname = param["package_name"]
+ else:
+ raise TypeError("'package_name' attribute is required")
+
+ if "class_name" in param:
+ cname = param["class_name"]
+ else:
+ raise TypeError("'class_name' attribute is required")
+
+ hstr = None
+ if "hash_str" in param:
+ hstr = param["hash_str"]
+
+ stype = "data"
+ if "type" in param:
+ stype = param["type"]
+
+ return SchemaClassId( pname, cname, stype, hstr )
+
+ else:
+ raise TypeError("Invalid type for SchemaClassId construction")
+
+
+
+class SchemaProperty(object):
+ """
+ Describes the structure of a Property data object.
+ Map format:
+ map["amqp_type"] = int, AMQP type code indicating property's data type
+
+ optional:
+ map["access"] = str, access allowed to this property, default "RO"
+ map["index"] = bool, True if this property is an index value, default False
+ map["optional"] = bool, True if this property is optional, default False
+ map["unit"] = str, describes units used
+ map["min"] = int, minimum allowed value
+ map["max"] = int, maximun allowed value
+ map["maxlen"] = int, if string type, this is the maximum length in bytes
+ required to represent the longest instance of this string.
+ map["desc"] = str, human-readable description of this argument
+ map["reference"] = str, ???
+ map["parent_ref"] = bool, true if this property references an object in
+ which this object is in a child-parent relationship. Default False
+ """
+ __hash__ = None
+ _access_strings = ["RO","RW","RC"]
+ def __init__(self, typeCode, kwargs={}):
+ self._type = typeCode
+ self._access = "RO"
+ self._isIndex = False
+ self._isOptional = False
+ self._unit = None
+ self._min = None
+ self._max = None
+ self._maxlen = None
+ self._desc = None
+ self._reference = None
+ self._isParentRef = False
+
+ for key, value in kwargs.items():
+ if key == "access":
+ value = str(value).upper()
+ if value not in self._access_strings:
+ raise TypeError("invalid value for access parameter: '%s':" % value )
+ self._access = value
+ elif key == "index" : self._isIndex = _toBool(value)
+ elif key == "optional": self._isOptional = _toBool(value)
+ elif key == "unit" : self._unit = value
+ elif key == "min" : self._min = value
+ elif key == "max" : self._max = value
+ elif key == "maxlen" : self._maxlen = value
+ elif key == "desc" : self._desc = value
+ elif key == "reference" : self._reference = value
+ elif key == "parent_ref" : self._isParentRef = _toBool(value)
+
+ def getType(self): return self._type
+
+ def getAccess(self): return self._access
+
+ def isOptional(self): return self._isOptional
+
+ def isIndex(self): return self._isIndex
+
+ def getUnit(self): return self._unit
+
+ def getMin(self): return self._min
+
+ def getMax(self): return self._max
+
+ def getMaxLen(self): return self._maxlen
+
+ def getDesc(self): return self._desc
+
+ def getReference(self): return self._reference
+
+ def isParentRef(self): return self._isParentRef
+
+ def mapEncode(self):
+ """
+ Return the map encoding of this schema.
+ """
+ _map = {}
+ _map["amqp_type"] = self._type
+ _map["access"] = self._access
+ _map["index"] = self._isIndex
+ _map["optional"] = self._isOptional
+ if self._unit: _map["unit"] = self._unit
+ if self._min: _map["min"] = self._min
+ if self._max: _map["max"] = self._max
+ if self._maxlen: _map["maxlen"] = self._maxlen
+ if self._desc: _map["desc"] = self._desc
+ if self._reference: _map["reference"] = self._reference
+ _map["parent_ref"] = self._isParentRef
+ return _map
+
+ def __repr__(self): return str(self.mapEncode())
+
+ def _updateHash(self, hasher):
+ """
+ Update the given hash object with a hash computed over this schema.
+ """
+ hasher.update(str(self._type))
+ hasher.update(str(self._isIndex))
+ hasher.update(str(self._isOptional))
+ if self._access: hasher.update(self._access)
+ if self._unit: hasher.update(self._unit)
+ if self._desc: hasher.update(self._desc)
+
+
+
+def SchemaPropertyFactory( _map ):
+ """
+ Factory for constructing SchemaProperty from a map
+
+ @type _map: dict
+ @param _map: from mapEncode() of SchemaProperty
+ @rtype: SchemaProperty
+ @returns: a new SchemaProperty instance
+ """
+ if type(_map) == dict:
+ # construct from map
+ if "amqp_type" not in _map:
+ raise TypeError("requires 'amqp_type' value")
+
+ return SchemaProperty( _map["amqp_type"], _map )
+
+ else:
+ raise TypeError("Invalid type for SchemaProperty construction")
+
+
+
+
+class SchemaArgument(object):
+ """
+ Describes the structure of an Argument to a Method call.
+ Map format:
+ map["amqp_type"] = int, type code indicating argument's data type
+ map["dir"] = str, direction for an argument associated with a
+ Method, "I"|"O"|"IO", default value: "I"
+ optional:
+ map["desc"] = str, human-readable description of this argument
+ map["default"] = by amqp_type, default value to use if none provided
+ """
+ __hash__ = None
+ _dir_strings = ["I", "O", "IO"]
+ def __init__(self, typeCode, kwargs={}):
+ self._type = typeCode
+ self._dir = "I"
+ self._desc = None
+ self._default = None
+
+ for key, value in kwargs.items():
+ if key == "dir":
+ value = str(value).upper()
+ if value not in self._dir_strings:
+ raise TypeError("invalid value for dir parameter: '%s'" % value)
+ self._dir = value
+ elif key == "desc" : self._desc = value
+ elif key == "default" : self._default = value
+
+ def getType(self): return self._type
+
+ def getDirection(self): return self._dir
+
+ def getDesc(self): return self._desc
+
+ def getDefault(self): return self._default
+
+ def mapEncode(self):
+ """
+ Return the map encoding of this schema.
+ """
+ _map = {}
+ _map["amqp_type"] = self._type
+ _map["dir"] = self._dir
+ # optional:
+ if self._default: _map["default"] = self._default
+ if self._desc: _map["desc"] = self._desc
+ return _map
+
+ def __repr__(self): return str(self.mapEncode())
+
+ def _updateHash(self, hasher):
+ """
+ Update the given hash object with a hash computed over this schema.
+ """
+ hasher.update(str(self._type))
+ hasher.update(self._dir)
+ if self._desc: hasher.update(self._desc)
+
+
+
+def SchemaArgumentFactory( param ):
+ """
+ Factory for constructing SchemaArguments from various sources
+
+ @type param: various
+ @param param: object to use for constructing SchemaArgument
+ @rtype: SchemaArgument
+ @returns: a new SchemaArgument instance
+ """
+ if type(param) == dict:
+ # construct from map
+ if not "amqp_type" in param:
+ raise TypeError("requires 'amqp_type' value")
+
+ return SchemaArgument( param["amqp_type"], param )
+
+ else:
+ raise TypeError("Invalid type for SchemaArgument construction")
+
+
+
+class SchemaMethod(object):
+ """
+ The SchemaMethod class describes the method's structure, and contains a
+ SchemaProperty class for each argument declared by the method.
+
+ Map format:
+ map["arguments"] = map of "name"=<SchemaArgument> pairs.
+ map["desc"] = str, description of the method
+ """
+ def __init__(self, args={}, _desc=None):
+ """
+ Construct a SchemaMethod.
+
+ @type args: map of "name"=<SchemaProperty> objects
+ @param args: describes the arguments accepted by the method
+ @type _desc: str
+ @param _desc: Human-readable description of the schema
+ """
+ self._arguments = args.copy()
+ self._desc = _desc
+
+ def getDesc(self): return self._desc
+
+ def getArgCount(self): return len(self._arguments)
+
+ def getArguments(self): return self._arguments.copy()
+
+ def getArgument(self, name): return self._arguments[name]
+
+ def addArgument(self, name, schema):
+ """
+ Add an argument to the list of arguments passed to this method.
+ Used by an agent for dynamically creating method schema.
+
+ @type name: string
+ @param name: name of new argument
+ @type schema: SchemaProperty
+ @param schema: SchemaProperty to add to this method
+ """
+ if not isinstance(schema, SchemaProperty):
+ raise TypeError("argument must be a SchemaProperty class")
+ self._arguments[name] = schema
+
+ def mapEncode(self):
+ """
+ Return the map encoding of this schema.
+ """
+ _map = {}
+ _args = {}
+ for name,val in self._arguments.iteritems():
+ _args[name] = val.mapEncode()
+ _map["arguments"] = _args
+ if self._desc: _map["desc"] = self._desc
+ return _map
+
+ def __repr__(self):
+ result = "("
+ first = True
+ for name,arg in self._arguments.iteritems():
+ if arg._dir.find("I") != -1:
+ if first:
+ first = False
+ else:
+ result += ", "
+ result += name
+ result += ")"
+ return result
+
+ def _updateHash(self, hasher):
+ """
+ Update the given hash object with a hash computed over this schema.
+ """
+ for name,val in self._arguments.iteritems():
+ hasher.update(name)
+ val._updateHash(hasher)
+ if self._desc: hasher.update(self._desc)
+
+
+
+def SchemaMethodFactory( param ):
+ """
+ Factory for constructing a SchemaMethod from various sources
+
+ @type param: various
+ @param param: object to use for constructing a SchemaMethod
+ @rtype: SchemaMethod
+ @returns: a new SchemaMethod instance
+ """
+ if type(param) == dict:
+ # construct from map
+ args = {}
+ desc = None
+ if "arguments" in param:
+ for name,val in param["arguments"].iteritems():
+ args[name] = SchemaArgumentFactory(val)
+ if "desc" in param:
+ desc = param["desc"]
+
+ return SchemaMethod( args, desc )
+
+ else:
+ raise TypeError("Invalid type for SchemaMethod construction")
+
+
+
+class SchemaClass(object):
+ """
+ Base class for Data and Event Schema classes.
+ """
+ def __init__(self, pname, cname, stype, desc=None, hstr=None):
+ """
+ Schema Class constructor.
+
+ @type pname: str
+ @param pname: package name
+ @type cname: str
+ @param cname: class name
+ @type stype: str
+ @param stype: type of schema, either "data" or "event"
+ @type desc: str
+ @param desc: Human-readable description of the schema
+ @type hstr: str
+ @param hstr: hash computed over the schema body, else None
+ """
+ self._pname = pname
+ self._cname = cname
+ self._type = stype
+ self._desc = desc
+ if hstr:
+ self._classId = SchemaClassId( pname, cname, stype, hstr )
+ else:
+ self._classId = None
+ self._properties = {}
+ self._methods = {}
+ self._pkeyNames = []
+
+
+ def getClassId(self):
+ if not self._classId:
+ self.generateHash()
+ return self._classId
+
+ def getDesc(self): return self._desc
+
+ def generateHash(self):
+ """
+ generate an md5 hash over the body of the schema,
+ and return a string representation of the hash
+ in format "%08x-%08x-%08x-%08x"
+ """
+ md5Hash = _md5Obj()
+ md5Hash.update(self._pname)
+ md5Hash.update(self._cname)
+ md5Hash.update(self._type)
+ for name,x in self._properties.iteritems():
+ md5Hash.update(name)
+ x._updateHash( md5Hash )
+ for name,x in self._methods.iteritems():
+ md5Hash.update(name)
+ x._updateHash( md5Hash )
+ idx = 0
+ for name in self._pkeyNames:
+ md5Hash.update(str(idx) + name)
+ idx += 1
+ hstr = md5Hash.hexdigest()[0:8] + "-" +\
+ md5Hash.hexdigest()[8:16] + "-" +\
+ md5Hash.hexdigest()[16:24] + "-" +\
+ md5Hash.hexdigest()[24:32]
+ # update classId with new hash value
+ self._classId = SchemaClassId( self._pname,
+ self._cname,
+ self._type,
+ hstr )
+ return hstr
+
+
+ def mapEncode(self):
+ """
+ Return the map encoding of this schema.
+ """
+ _map = {}
+ _map["schema_id"] = self.getClassId().mapEncode()
+ _map["desc"] = self._desc
+ if len(self._properties):
+ _props = {}
+ for name,val in self._properties.iteritems():
+ _props[name] = val.mapEncode()
+ _map["properties"] = _props
+
+ if len(self._methods):
+ _meths = {}
+ for name,val in self._methods.iteritems():
+ _meths[name] = val.mapEncode()
+ _map["methods"] = _meths
+
+ if len(self._pkeyNames):
+ _map["primary_key"] = self._pkeyNames[:]
+ return _map
+
+ def __repr__(self):
+ return str(self.getClassId())
+
+
+
+class SchemaObjectClass(SchemaClass):
+ """
+ A schema class that describes a data object. The data object is composed
+ of zero or more properties and methods. An instance of the SchemaObjectClass
+ can be identified using a key generated by concantenating the values of
+ all properties named in the primary key list.
+
+ Map format:
+ map["schema_id"] = map, SchemaClassId map for this object.
+ map["desc"] = human readable description of this schema.
+ map["primary_key"] = ordered list of property names used to construct the Primary Key"
+ map["properties"] = map of "name":SchemaProperty instances.
+ map["methods"] = map of "name":SchemaMethods instances.
+ """
+ def __init__( self, pname, cname, desc=None, _hash=None,
+ _props={}, _pkey=[], _methods={}):
+ """
+ @type pname: str
+ @param pname: name of package this schema belongs to
+ @type cname: str
+ @param cname: class name for this schema
+ @type desc: str
+ @param desc: Human-readable description of the schema
+ @type _hash: str
+ @param _methods: hash computed on the body of this schema, if known
+ @type _props: map of 'name':<SchemaProperty> objects
+ @param _props: all properties provided by this schema
+ @type _pkey: list of strings
+ @param _pkey: names of each property to be used for constructing the primary key
+ @type _methods: map of 'name':<SchemaMethod> objects
+ @param _methods: all methods provided by this schema
+ """
+ super(SchemaObjectClass, self).__init__(pname,
+ cname,
+ SchemaTypeData,
+ desc,
+ _hash)
+ self._properties = _props.copy()
+ self._pkeyNames = _pkey[:]
+ self._methods = _methods.copy()
+
+ def getPrimaryKeyList(self): return self._pkeyNames[:]
+
+ def getPropertyCount(self): return len(self._properties)
+ def getProperties(self): return self._properties.copy()
+ def getProperty(self, name): return self._properties[name]
+
+ def getMethodCount(self): return len(self._methods)
+ def getMethods(self): return self._methods.copy()
+ def getMethod(self, name): return self._methods[name]
+
+ def addProperty(self, name, prop):
+ self._properties[name] = prop
+ # need to re-generate schema hash
+ self._classId = None
+
+ def addMethod(self, name, method):
+ self._methods[name] = method
+ self._classId = None
+
+
+
+def SchemaObjectClassFactory( param ):
+ """
+ Factory for constructing a SchemaObjectClass from various sources.
+
+ @type param: various
+ @param param: object to use for constructing a SchemaObjectClass instance
+ @rtype: SchemaObjectClass
+ @returns: a new SchemaObjectClass instance
+ """
+ if type(param) == dict:
+ classId = None
+ properties = {}
+ methods = {}
+ pkey = []
+ if "schema_id" in param:
+ classId = SchemaClassIdFactory(param["schema_id"])
+ if (not classId) or (classId.getType() != SchemaTypeData):
+ raise TypeError("Invalid SchemaClassId specified: %s" % classId)
+ if "desc" in param:
+ desc = param["desc"]
+ if "primary_key" in param:
+ pkey = param["primary_key"]
+ if "properties" in param:
+ for name,val in param["properties"].iteritems():
+ properties[name] = SchemaPropertyFactory(val)
+ if "methods" in param:
+ for name,val in param["methods"].iteritems():
+ methods[name] = SchemaMethodFactory(val)
+
+ return SchemaObjectClass( classId.getPackageName(),
+ classId.getClassName(),
+ desc,
+ _hash = classId.getHashString(),
+ _props = properties, _pkey = pkey,
+ _methods = methods)
+
+ else:
+ raise TypeError("Invalid type for SchemaObjectClass construction")
+
+
+
+class SchemaEventClass(SchemaClass):
+ """
+ A schema class that describes an event. The event is composed
+ of zero or more properties.
+
+ Map format:
+ map["schema_id"] = map, SchemaClassId map for this object.
+ map["desc"] = string description of this schema
+ map["properties"] = map of "name":SchemaProperty values.
+ """
+ def __init__( self, pname, cname, desc=None, _props={}, _hash=None ):
+ super(SchemaEventClass, self).__init__(pname,
+ cname,
+ SchemaTypeEvent,
+ desc,
+ _hash )
+ self._properties = _props.copy()
+
+ def getPropertyCount(self): return len(self._properties)
+ def getProperties(self): return self._properties.copy()
+ def getProperty(self, name): return self._properties[name]
+ def addProperty(self, name, prop):
+ self._properties[name] = prop
+ # need to re-generate schema hash
+ self._classId = None
+
+ def mapEncode(self):
+ _map = super(SchemaEventClass, self).mapEncode()
+ return _map
+
+
+
+def SchemaEventClassFactory( param ):
+ """
+ Factory for constructing a SchemaEventClass from various sources.
+
+ @type param: various
+ @param param: object to use for constructing a SchemaEventClass instance
+ @rtype: SchemaEventClass
+ @returns: a new SchemaEventClass instance
+ """
+ if type(param) == dict:
+ logging.debug( "constructing SchemaEventClass from map '%s'" % param )
+ classId = None
+ properties = {}
+ if "schema_id" in param:
+ classId = SchemaClassIdFactory(param["schema_id"])
+ if (not classId) or (classId.getType() != SchemaTypeEvent):
+ raise TypeError("Invalid SchemaClassId specified: %s" % classId)
+ if "desc" in param:
+ desc = param["desc"]
+ if "properties" in param:
+ for name,val in param["properties"].iteritems():
+ properties[name] = SchemaPropertyFactory(val)
+
+ return SchemaEventClass( classId.getPackageName(),
+ classId.getClassName(),
+ desc,
+ _hash = classId.getHashString(),
+ _props = properties )
+ else:
+ raise TypeError("Invalid type for SchemaEventClass construction")
+
+
+
+
+
+
+
+
+
diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py
new file mode 100644
index 0000000000..ec22bbbb50
--- /dev/null
+++ b/qpid/python/qmf/qmfConsole.py
@@ -0,0 +1,1129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys
+import os
+import logging
+import platform
+import time
+import Queue
+from threading import Thread
+from threading import Lock
+from threading import currentThread
+from threading import Condition
+
+from qpid.messaging import *
+
+from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION,
+ AMQP_QMF_AGENT_LOCATE)
+from qmfCommon import AgentId
+
+
+
+# global flag that indicates which thread (if any) is
+# running the console callback
+_callback_thread=None
+
+
+
+
+##==============================================================================
+## Sequence Manager
+##==============================================================================
+
+class _Mailbox(object):
+ """
+ Virtual base class for all Mailbox-like objects
+ """
+ def __init__(self):
+ self._msgs = []
+ self._cv = Condition()
+ self._waiting = False
+
+ def deliver(self, obj):
+ self._cv.acquire()
+ try:
+ self._msgs.append(obj)
+ # if was empty, notify waiters
+ if len(self._msgs) == 1:
+ self._cv.notify()
+ finally:
+ self._cv.release()
+
+ def fetch(self, timeout=None):
+ self._cv.acquire()
+ try:
+ if len(self._msgs):
+ return self._msgs.pop()
+ self._cv.wait(timeout)
+ if len(self._msgs):
+ return self._msgs.pop()
+ return None
+ finally:
+ self._cv.release()
+
+
+
+class SequencedWaiter(object):
+ """
+ Manage sequence numbers for asynchronous method calls.
+ Allows the caller to associate a generic piece of data with a unique sequence
+ number."""
+
+ def __init__(self):
+ self.lock = Lock()
+ self.sequence = 1L
+ self.pending = {}
+
+
+ def allocate(self):
+ """
+ Reserve a sequence number.
+
+ @rtype: long
+ @return: a unique nonzero sequence number.
+ """
+ self.lock.acquire()
+ try:
+ seq = self.sequence
+ self.sequence = self.sequence + 1
+ self.pending[seq] = _Mailbox()
+ finally:
+ self.lock.release()
+ logging.debug( "sequence %d allocated" % seq)
+ return seq
+
+
+ def put_data(self, seq, new_data):
+ seq = long(seq)
+ logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) )
+ self.lock.acquire()
+ try:
+ if seq in self.pending:
+ self.pending[seq].deliver(new_data)
+ else:
+ logging.error( "seq %d not found!" % seq )
+ finally:
+ self.lock.release()
+
+
+
+ def get_data(self, seq, timeout=None):
+ """
+ Release a sequence number reserved using the reserve method. This must
+ be called when the sequence is no longer needed.
+
+ @type seq: int
+ @param seq: a sequence previously allocated by calling reserve().
+ @rtype: any
+ @return: the data originally associated with the reserved sequence number.
+ """
+ seq = long(seq)
+ logging.debug( "getting data for seq=%d" % seq)
+ mbox = None
+ self.lock.acquire()
+ try:
+ if seq in self.pending:
+ mbox = self.pending[seq]
+ finally:
+ self.lock.release()
+
+ # Note well: pending list is unlocked, so we can wait.
+ # we reference mbox locally, so it will not be released
+ # until we are done.
+
+ if mbox:
+ d = mbox.fetch(timeout)
+ logging.debug( "seq %d fetched %r!" % (seq, d) )
+ return d
+
+ logging.debug( "seq %d not found!" % seq )
+ return None
+
+
+ def release(self, seq):
+ """
+ Release the sequence, and its mailbox
+ """
+ seq = long(seq)
+ logging.debug( "releasing seq %d" % seq )
+ self.lock.acquire()
+ try:
+ if seq in self.pending:
+ del self.pending[seq]
+ finally:
+ self.lock.release()
+
+
+
+#class ObjectProxy(QmfObject):
+class ObjectProxy(object):
+ """
+ A local representation of a QmfObject that is managed by a remote agent.
+ """
+ def __init__(self, agent, cls, kwargs={}):
+ """
+ @type agent: qmfConsole.AgentProxy
+ @param agent: Agent that manages this object.
+ @type cls: qmfCommon.SchemaObjectClass
+ @param cls: Schema that describes the class.
+ @type kwargs: dict
+ @param kwargs: ??? supported keys ???
+ """
+ # QmfObject.__init__(self, cls, kwargs)
+ self._agent = agent
+
+ # def update(self):
+ def refresh(self, timeout = None):
+ """
+ Called to re-fetch the current state of the object from the agent. This updates
+ the contents of the object to their most current values.
+
+ @rtype: bool
+ @return: True if refresh succeeded. Refresh may fail if agent does not respond.
+ """
+ if not self._agent:
+ raise Exception("No Agent associated with this object")
+ newer = self._agent.get_object(Query({"object_id":object_id}), timeout)
+ if newer == None:
+ logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
+ raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
+ self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class
+
+ ### def _merge_update(self, newerObject):
+ ### ??? in Rafi's console.py::Object Class
+
+
+ ### def is_deleted(self):
+ ### ??? in Rafi's console.py::Object Class
+
+ def key(self): pass
+
+
+
+
+class AgentProxy(object):
+ """
+ A local representation of a remote agent managed by this console.
+ """
+ def __init__(self, name):
+ """
+ @type name: AgentId
+ @param name: uniquely identifies this agent in the AMQP domain.
+ """
+ if not name or not isinstance(name, AgentId):
+ raise Exception( "Attempt to create an Agent without supplying a valid agent name." );
+
+ self._name = name
+ self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(name)
+ self._console = None
+ self._sender = None
+ self._packages = [] # list of package names known to this agent
+ self._classes = {} # dict [key:class] of classes known to this agent
+ self._subscriptions = [] # list of active standing subscriptions for this agent
+ self._exists = False # true when Agent Announce is received from this agent
+ logging.debug( "Created AgentProxy with address: [%s]" % self._address )
+
+
+ def key(self):
+ return str(self._name)
+
+
+ def _send_msg(self, msg):
+ """
+ Low-level routine to asynchronously send a message to this agent.
+ """
+ msg.reply_to = self._console.address()
+ handle = self._console._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a sequence id!")
+ msg.correlation_id = str(handle)
+ self._sender.send(msg)
+ return handle
+
+
+
+ def _fetch_reply_msg(self, handle, timeout=None):
+ """
+ Low-level routine to wait for an expected reply from the agent.
+ """
+ if handle == 0:
+ raise Exception("Invalid handle")
+ msg = self._console._req_correlation.get_data( handle, timeout )
+ if not msg:
+ logging.debug("timed out waiting for reply message")
+ self._console._req_correlation.release( handle )
+ return msg
+
+
+ def get_packages(self):
+ """
+ Return a list of the names of all packages known to this agent.
+ """
+ return self._packages[:]
+
+ def get_classes(self):
+ """
+ Return a dictionary [key:class] of classes known to this agent.
+ """
+ return self._classes[:]
+
+ def get_objects(self, query, kwargs={}):
+ """
+ Return a list of objects that satisfy the given query.
+
+ @type query: dict, or qmfCommon.Query
+ @param query: filter for requested objects
+ @type kwargs: dict
+ @param kwargs: ??? used to build match selector and query ???
+ @rtype: list
+ @return: list of matching objects, or None.
+ """
+ pass
+
+ def get_object(self, query, kwargs={}):
+ """
+ Get one object - query is expected to match only one object.
+ ??? Recommended: explicit timeout param, default None ???
+
+ @type query: dict, or qmfCommon.Query
+ @param query: filter for requested objects
+ @type kwargs: dict
+ @param kwargs: ??? used to build match selector and query ???
+ @rtype: qmfConsole.ObjectProxy
+ @return: one matching object, or none
+ """
+ pass
+
+
+ def create_subscription(self, query):
+ """
+ Factory for creating standing subscriptions based on a given query.
+
+ @type query: qmfCommon.Query object
+ @param query: determines the list of objects for which this subscription applies
+ @rtype: qmfConsole.Subscription
+ @returns: an object representing the standing subscription.
+ """
+ pass
+
+ def __repr__(self):
+ return self._address
+
+ def __str__(self):
+ return self.__repr__()
+
+
+
+ ##==============================================================================
+ ## CONSOLE
+ ##==============================================================================
+
+
+
+class WorkItem(object):
+ """
+ Describes an event that has arrived at the Console for the
+ application to process. The Notifier is invoked when one or
+ more of these WorkItems become available for processing.
+ """
+ #
+ # Enumeration of the types of WorkItems produced by the Console
+ #
+ AGENT_ADDED = 1
+ AGENT_DELETED = 2
+ NEW_PACKAGE = 3
+ NEW_CLASS = 4
+ OBJECT_UPDATE = 5
+ EVENT_RECEIVED = 7
+ AGENT_HEARTBEAT = 8
+
+ def __init__(self, kind, kwargs={}):
+ """
+ Used by the Console to create a work item.
+
+ @type kind: int
+ @param kind: work item type
+ """
+ self._kind = kind
+ self._param_map = kwargs
+
+
+ def getType(self):
+ return self._kind
+
+ def getParams(self):
+ return self._param_map
+
+
+
+class Notifier(object):
+ """
+ Virtual base class that defines a call back which alerts the application that
+ a QMF Console notification is pending.
+ """
+ def console_indication(self):
+ """
+ Called when one or more console items are ready for the console application to process.
+ This method may be called by the internal console management thread. Its purpose is to
+ indicate that the console application should process pending items.
+ """
+ pass
+
+
+
+
+class Console(Thread):
+ """
+ A Console manages communications to a collection of agents on behalf of an application.
+ """
+ def __init__(self, name=None, notifier=None, kwargs={}):
+ """
+ @type name: str
+ @param name: identifier for this console. Must be unique.
+ @type notifier: qmfConsole.Notifier
+ @param notifier: invoked when events arrive for processing.
+ @type kwargs: dict
+ @param kwargs: ??? Unused
+ """
+ Thread.__init__(self)
+ self._name = name
+ if not self._name:
+ self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
+ self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name
+ self._notifier = notifier
+ self._conn = None
+ self._session = None
+ # dict of "agent-direct-address":AgentProxy entries
+ self._agent_map = {}
+ self._agent_map_lock = Lock()
+ self._direct_recvr = None
+ self._announce_recvr = None
+ self._locate_sender = None
+ self._schema_cache = {}
+ self._req_correlation = SequencedWaiter()
+ self._operational = False
+ self._agent_discovery = False
+ # lock out run() thread
+ self._cv = Condition()
+ # for passing WorkItems to the application
+ self._work_q = Queue.Queue()
+ ## Old stuff below???
+ #self._broker_list = []
+ #self.impl = qmfengine.Console()
+ #self._event = qmfengine.ConsoleEvent()
+ ##self._cv = Condition()
+ ##self._sync_count = 0
+ ##self._sync_result = None
+ ##self._select = {}
+ ##self._cb_cond = Condition()
+
+
+
+ def destroy(self, timeout=None):
+ """
+ Must be called before the Console is deleted.
+ Frees up all resources and shuts down all background threads.
+
+ @type timeout: float
+ @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
+ """
+ logging.debug("Destroying Console...")
+ if self._conn:
+ self.remove_connection(self._conn, timeout)
+ logging.debug("Console Destroyed")
+
+
+
+ def add_connection(self, conn):
+ """
+ Add a AMQP connection to the console. The console will setup a session over the
+ connection. The console will then broadcast an Agent Locate Indication over
+ the session in order to discover present agents.
+
+ @type conn: qpid.messaging.Connection
+ @param conn: the connection to the AMQP messaging infrastructure.
+ """
+ if self._conn:
+ raise Exception( "Multiple connections per Console not supported." );
+ self._conn = conn
+ self._session = conn.session(name=self._name)
+ self._direct_recvr = self._session.receiver(self._address)
+ self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION)
+ self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE)
+ #
+ # Now that receivers are created, fire off the receive thread...
+ #
+ self._operational = True
+ self.start()
+
+
+
+ def remove_connection(self, conn, timeout=None):
+ """
+ Remove an AMQP connection from the console. Un-does the add_connection() operation,
+ and releases any agents and sessions associated with the connection.
+
+ @type conn: qpid.messaging.Connection
+ @param conn: connection previously added by add_connection()
+ """
+ if self._conn and conn and conn != self._conn:
+ logging.error( "Attempt to delete unknown connection: %s" % str(conn))
+
+ # tell connection thread to shutdown
+ self._operational = False
+ if self.isAlive():
+ # kick my thread to wake it up
+ logging.debug("Making temp sender for [%s]" % self._address)
+ tmp_sender = self._session.sender(self._address)
+ try:
+ msg = Message(subject="qmf4",
+ properties={"method":"request",
+ "opcode":"console-ping"},
+ content={"data":"ignore"})
+ tmp_sender.send( msg, sync=True )
+ except SendError, e:
+ logging.error(str(e))
+ logging.debug("waiting for console receiver thread to exit")
+ self.join(timeout)
+ if self.isAlive():
+ logging.error( "Console thread '%s' is hung..." % self.getName() )
+ self._direct_recvr.close()
+ self._announce_recvr.close()
+ self._locate_sender.close()
+ self._session = None
+ self._conn = None
+ logging.debug("console connection removal complete")
+
+
+ def address(self):
+ """
+ The AMQP address this Console is listening to.
+ """
+ return self._address
+
+
+ def create_agent( self, agent_name ):
+ """
+ Factory to create/retrieve an agent for this console
+ """
+ if not isinstance(agent_name, AgentId):
+ raise TypeError("agent_name must be an instance of AgentId")
+
+ agent = AgentProxy(agent_name)
+
+ self._agent_map_lock.acquire()
+ try:
+ if agent_name in self._agent_map:
+ return self._agent_map[agent_name]
+
+ agent._console = self
+ agent._sender = self._session.sender(agent._address)
+ self._agent_map[agent_name] = agent
+ finally:
+ self._agent_map_lock.release()
+
+ return agent
+
+
+
+ def destroy_agent( self, agent ):
+ """
+ Undoes create.
+ """
+ if not isinstance(agent, AgentProxy):
+ raise TypeError("agent must be an instance of AgentProxy")
+
+ self._agent_map_lock.acquire()
+ try:
+ if agent._name in self._agent_map:
+ del self._agent_map[agent._name]
+ finally:
+ self._agent_map_lock.release()
+
+
+
+
+ def find_agent(self, agent_name, timeout=None ):
+ """
+ Given the name of a particular agent, return an AgentProxy representing
+ that agent. Return None if the agent does not exist.
+ """
+ self._agent_map_lock.acquire()
+ try:
+ if agent_name in self._agent_map:
+ return self._agent_map[agent_name]
+ finally:
+ self._agent_map_lock.release()
+
+ new_agent = self.create_agent(agent_name)
+ msg = Message(subject="qmf4",
+ properties={"method":"request",
+ "opcode":"agent-locate"},
+ content={"query": {"vendor" : agent_name.vendor(),
+ "product" : agent_name.product(),
+ "name" : agent_name.name()}})
+ handle = new_agent._send_msg(msg)
+ if handle == 0:
+ raise Exception("Failed to send Agent locate message to agent %s" % str(agent_name))
+
+ msg = new_agent._fetch_reply_msg(handle, timeout)
+ if not msg:
+ logging.debug("Unable to contact agent '%s' - no reply." % agent_name)
+ self.destroy_agent(new_agent)
+ return None
+ # @todo - for now, dump the message
+ logging.info( "agent-locate reply received for %s" % agent_name)
+ return new_agent
+
+
+
+ def run(self):
+ global _callback_thread
+ #
+ # @todo KAG Rewrite when api supports waiting on multiple receivers
+ #
+ while self._operational:
+
+ qLen = self._work_q.qsize()
+
+ try:
+ msg = self._announce_recvr.fetch(timeout = 0)
+ if msg:
+ self._rcv_announce(msg)
+ except:
+ pass
+
+ try:
+ msg = self._direct_recvr.fetch(timeout = 0)
+ if msg:
+ self._rcv_direct(msg)
+ except:
+ pass
+
+ # try:
+ # logging.error("waiting for next rcvr...")
+ # rcvr = self._session.next_receiver()
+ # except:
+ # logging.error("exception during next_receiver()")
+
+ # logging.error("rcvr=%s" % str(rcvr))
+
+
+ if qLen == 0 and self._work_q.qsize() and self._notifier:
+ # work queue went non-empty, kick
+ # the application...
+
+ _callback_thread = currentThread()
+ logging.info("Calling console indication")
+ self._notifier.console_indication()
+ _callback_thread = None
+
+ while self._operational and \
+ self._announce_recvr.pending() == 0 and \
+ self._direct_recvr.pending():
+ time.sleep(0.5)
+
+ logging.debug("Shutting down Console thread")
+
+
+
+ # called by run() thread ONLY
+ #
+ def _rcv_announce(self, msg):
+ """
+ PRIVATE: Process a message received on the announce receiver
+ """
+ logging.info( "Announce message received!" )
+ if msg.subject != "qmf4":
+ logging.debug("Ignoring non-qmf message '%s'" % msg.subject)
+ return
+
+ amap = {}
+ if msg.content_type == "amqp/map":
+ amap = msg.content
+
+ if (not msg.properties or
+ not "method" in msg.properties or
+ not "opcode" in msg.properties):
+ logging.error("INVALID MESSAGE PROPERTIES: '%s'" % str(msg.properties))
+ return
+
+ if msg.properties["method"] == "indication":
+ # agent indication
+ if msg.properties["opcode"] == "agent":
+ if "name" in amap:
+ if self._agent_discovery:
+ ind = amap["name"]
+ if "vendor" in ind and "product" in ind and "name" in ind:
+
+ agent = self.create_agent(AgentId( ind["vendor"],
+ ind["product"],
+ ind["name"] ))
+ if not agent._exists:
+ # new agent
+ agent._exists = True
+ logging.info("AGENT_ADDED for %s" % agent)
+ wi = WorkItem(WorkItem.AGENT_ADDED,
+ {"agent": agent})
+ self._work_q.put(wi)
+ else:
+ logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
+ % msg.properties["opcode"])
+ else:
+ logging.warning("Ignoring message with unrecognized 'method' value: '%s'"
+ % msg.properties["method"] )
+
+
+
+
+ # called by run() thread ONLY
+ #
+ def _rcv_direct(self, msg):
+ """
+ PRIVATE: Process a message sent to my direct receiver
+ """
+ logging.info( "direct message received!" )
+ if msg.correlation_id:
+ self._req_correlation.put_data(msg.correlation_id, msg)
+
+
+
+ def enable_agent_discovery(self):
+ """
+ Called to enable the asynchronous Agent Discovery process.
+ Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
+ """
+ if not self._agent_discovery:
+ self._agent_discovery = True
+ msg = Message(subject="qmf4",
+ properties={"method":"request",
+ "opcode":"agent-locate"},
+ content={"query": {"vendor": "*",
+ "product": "*",
+ "name": "*"}})
+ self._locate_sender.send(msg)
+
+
+
+ def disable_agent_discovery(self):
+ """
+ Called to disable the async Agent Discovery process enabled by
+ calling enable_agent_discovery()
+ """
+ self._agent_discovery = False
+
+
+
+ def get_workitem_count(self):
+ """
+ Returns the count of pending WorkItems that can be retrieved.
+ """
+ return self._work_q.qsize()
+
+
+
+ def get_next_workitem(self, timeout=None):
+ """
+ Returns the next pending work item, or None if none available.
+ @todo: subclass and return an Empty event instead.
+ """
+ try:
+ wi = self._work_q.get(True, timeout)
+ except Queue.Empty:
+ return None
+ return wi
+
+
+ def release_workitem(self, wi):
+ """
+ Return a WorkItem to the Console when it is no longer needed.
+ @todo: call Queue.task_done() - only 2.5+
+
+ @type wi: class qmfConsole.WorkItem
+ @param wi: work item object to return.
+ """
+ pass
+
+
+
+ # def get_packages(self):
+ # plist = []
+ # for i in range(self.impl.packageCount()):
+ # plist.append(self.impl.getPackageName(i))
+ # return plist
+
+
+ # def get_classes(self, package, kind=CLASS_OBJECT):
+ # clist = []
+ # for i in range(self.impl.classCount(package)):
+ # key = self.impl.getClass(package, i)
+ # class_kind = self.impl.getClassKind(key)
+ # if class_kind == kind:
+ # if kind == CLASS_OBJECT:
+ # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)}))
+ # elif kind == CLASS_EVENT:
+ # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)}))
+ # return clist
+
+
+ # def bind_package(self, package):
+ # return self.impl.bindPackage(package)
+
+
+ # def bind_class(self, kwargs = {}):
+ # if "key" in kwargs:
+ # self.impl.bindClass(kwargs["key"])
+ # elif "package" in kwargs:
+ # package = kwargs["package"]
+ # if "class" in kwargs:
+ # self.impl.bindClass(package, kwargs["class"])
+ # else:
+ # self.impl.bindClass(package)
+ # else:
+ # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
+
+
+ # def get_agents(self, broker=None):
+ # blist = []
+ # if broker:
+ # blist.append(broker)
+ # else:
+ # self._cv.acquire()
+ # try:
+ # # copy while holding lock
+ # blist = self._broker_list[:]
+ # finally:
+ # self._cv.release()
+
+ # agents = []
+ # for b in blist:
+ # for idx in range(b.impl.agentCount()):
+ # agents.append(AgentProxy(b.impl.getAgent(idx), b))
+
+ # return agents
+
+
+ # def get_objects(self, query, kwargs = {}):
+ # timeout = 30
+ # agent = None
+ # temp_args = kwargs.copy()
+ # if type(query) == type({}):
+ # temp_args.update(query)
+
+ # if "_timeout" in temp_args:
+ # timeout = temp_args["_timeout"]
+ # temp_args.pop("_timeout")
+
+ # if "_agent" in temp_args:
+ # agent = temp_args["_agent"]
+ # temp_args.pop("_agent")
+
+ # if type(query) == type({}):
+ # query = Query(temp_args)
+
+ # self._select = {}
+ # for k in temp_args.iterkeys():
+ # if type(k) == str:
+ # self._select[k] = temp_args[k]
+
+ # self._cv.acquire()
+ # try:
+ # self._sync_count = 1
+ # self._sync_result = []
+ # broker = self._broker_list[0]
+ # broker.send_query(query.impl, None, agent)
+ # self._cv.wait(timeout)
+ # if self._sync_count == 1:
+ # raise Exception("Timed out: waiting for query response")
+ # finally:
+ # self._cv.release()
+
+ # return self._sync_result
+
+
+ # def get_object(self, query, kwargs = {}):
+ # '''
+ # Return one and only one object or None.
+ # '''
+ # objs = objects(query, kwargs)
+ # if len(objs) == 1:
+ # return objs[0]
+ # else:
+ # return None
+
+
+ # def first_object(self, query, kwargs = {}):
+ # '''
+ # Return the first of potentially many objects.
+ # '''
+ # objs = objects(query, kwargs)
+ # if objs:
+ # return objs[0]
+ # else:
+ # return None
+
+
+ # # Check the object against select to check for a match
+ # def _select_match(self, object):
+ # schema_props = object.properties()
+ # for key in self._select.iterkeys():
+ # for prop in schema_props:
+ # if key == p[0].name() and self._select[key] != p[1]:
+ # return False
+ # return True
+
+
+ # def _get_result(self, list, context):
+ # '''
+ # Called by Broker proxy to return the result of a query.
+ # '''
+ # self._cv.acquire()
+ # try:
+ # for item in list:
+ # if self._select_match(item):
+ # self._sync_result.append(item)
+ # self._sync_count -= 1
+ # self._cv.notify()
+ # finally:
+ # self._cv.release()
+
+
+ # def start_sync(self, query): pass
+
+
+ # def touch_sync(self, sync): pass
+
+
+ # def end_sync(self, sync): pass
+
+
+
+
+# def start_console_events(self):
+# self._cb_cond.acquire()
+# try:
+# self._cb_cond.notify()
+# finally:
+# self._cb_cond.release()
+
+
+# def _do_console_events(self):
+# '''
+# Called by the Console thread to poll for events. Passes the events
+# onto the ConsoleHandler associated with this Console. Is called
+# periodically, but can also be kicked by Console.start_console_events().
+# '''
+# count = 0
+# valid = self.impl.getEvent(self._event)
+# while valid:
+# count += 1
+# try:
+# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
+# logging.debug("Console Event AGENT_ADDED received")
+# if self._handler:
+# self._handler.agent_added(AgentProxy(self._event.agent, None))
+# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
+# logging.debug("Console Event AGENT_DELETED received")
+# if self._handler:
+# self._handler.agent_deleted(AgentProxy(self._event.agent, None))
+# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
+# logging.debug("Console Event NEW_PACKAGE received")
+# if self._handler:
+# self._handler.new_package(self._event.name)
+# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
+# logging.debug("Console Event NEW_CLASS received")
+# if self._handler:
+# self._handler.new_class(SchemaClassKey(self._event.classKey))
+# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
+# logging.debug("Console Event OBJECT_UPDATE received")
+# if self._handler:
+# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
+# self._event.hasProps, self._event.hasStats)
+# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
+# logging.debug("Console Event EVENT_RECEIVED received")
+# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
+# logging.debug("Console Event AGENT_HEARTBEAT received")
+# if self._handler:
+# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
+# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
+# logging.debug("Console Event METHOD_RESPONSE received")
+# else:
+# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
+# except e:
+# print "Exception caught in callback thread:", e
+# self.impl.popEvent()
+# valid = self.impl.getEvent(self._event)
+# return count
+
+
+
+
+
+# class Broker(ConnectionHandler):
+# # attr_reader :impl :conn, :console, :broker_bank
+# def __init__(self, console, conn):
+# self.broker_bank = 1
+# self.console = console
+# self.conn = conn
+# self._session = None
+# self._cv = Condition()
+# self._stable = None
+# self._event = qmfengine.BrokerEvent()
+# self._xmtMessage = qmfengine.Message()
+# self.impl = qmfengine.BrokerProxy(self.console.impl)
+# self.console.impl.addConnection(self.impl, self)
+# self.conn.add_conn_handler(self)
+# self._operational = True
+
+
+# def shutdown(self):
+# logging.debug("broker.shutdown() called.")
+# self.console.impl.delConnection(self.impl)
+# self.conn.del_conn_handler(self)
+# if self._session:
+# self.impl.sessionClosed()
+# logging.debug("broker.shutdown() sessionClosed done.")
+# self._session.destroy()
+# logging.debug("broker.shutdown() session destroy done.")
+# self._session = None
+# self._operational = False
+# logging.debug("broker.shutdown() done.")
+
+
+# def wait_for_stable(self, timeout = None):
+# self._cv.acquire()
+# try:
+# if self._stable:
+# return
+# if timeout:
+# self._cv.wait(timeout)
+# if not self._stable:
+# raise Exception("Timed out: waiting for broker connection to become stable")
+# else:
+# while not self._stable:
+# self._cv.wait()
+# finally:
+# self._cv.release()
+
+
+# def send_query(self, query, ctx, agent):
+# agent_impl = None
+# if agent:
+# agent_impl = agent.impl
+# self.impl.sendQuery(query, ctx, agent_impl)
+# self.conn.kick()
+
+
+# def _do_broker_events(self):
+# count = 0
+# valid = self.impl.getEvent(self._event)
+# while valid:
+# count += 1
+# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
+# logging.debug("Broker Event BROKER_INFO received");
+# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
+# logging.debug("Broker Event DECLARE_QUEUE received");
+# self.conn.impl.declareQueue(self._session.handle, self._event.name)
+# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
+# logging.debug("Broker Event DELETE_QUEUE received");
+# self.conn.impl.deleteQueue(self._session.handle, self._event.name)
+# elif self._event.kind == qmfengine.BrokerEvent.BIND:
+# logging.debug("Broker Event BIND received");
+# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+# elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
+# logging.debug("Broker Event UNBIND received");
+# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
+# logging.debug("Broker Event SETUP_COMPLETE received");
+# self.impl.startProtocol()
+# elif self._event.kind == qmfengine.BrokerEvent.STABLE:
+# logging.debug("Broker Event STABLE received");
+# self._cv.acquire()
+# try:
+# self._stable = True
+# self._cv.notify()
+# finally:
+# self._cv.release()
+# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE:
+# result = []
+# for idx in range(self._event.queryResponse.getObjectCount()):
+# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self}))
+# self.console._get_result(result, self._event.context)
+# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE:
+# obj = self._event.context
+# obj._method_result(MethodResponse(self._event.methodResponse()))
+
+# self.impl.popEvent()
+# valid = self.impl.getEvent(self._event)
+
+# return count
+
+
+# def _do_broker_messages(self):
+# count = 0
+# valid = self.impl.getXmtMessage(self._xmtMessage)
+# while valid:
+# count += 1
+# logging.debug("Broker: sending msg on connection")
+# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
+# self.impl.popXmt()
+# valid = self.impl.getXmtMessage(self._xmtMessage)
+
+# return count
+
+
+# def _do_events(self):
+# while True:
+# self.console.start_console_events()
+# bcnt = self._do_broker_events()
+# mcnt = self._do_broker_messages()
+# if bcnt == 0 and mcnt == 0:
+# break;
+
+
+# def conn_event_connected(self):
+# logging.debug("Broker: Connection event CONNECTED")
+# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
+# self.impl.sessionOpened(self._session.handle)
+# self._do_events()
+
+
+# def conn_event_disconnected(self, error):
+# logging.debug("Broker: Connection event DISCONNECTED")
+# pass
+
+
+# def conn_event_visit(self):
+# self._do_events()
+
+
+# def sess_event_session_closed(self, context, error):
+# logging.debug("Broker: Session event CLOSED")
+# self.impl.sessionClosed()
+
+
+# def sess_event_recv(self, context, message):
+# logging.debug("Broker: Session event MSG_RECV")
+# if not self._operational:
+# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
+# self.impl.handleRcvMessage(message)
+# self._do_events()
+
+
+
diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py
new file mode 100644
index 0000000000..ea7366945a
--- /dev/null
+++ b/qpid/python/qmf/test/agent_test.py
@@ -0,0 +1,58 @@
+import logging
+import time
+
+from qpid.messaging import *
+from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty,
+ SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed,
+ QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
+ QmfEvent, SchemaMethod)
+from qmfAgent import (Agent, QmfAgentData)
+
+
+class MyAgent(object):
+ def main(self):
+
+ self._agent = Agent( "redhat.com", "qmf", "testAgent" )
+
+ # Dynamically construct a class schema
+
+ _schema = SchemaObjectClass( "MyPackage", "MyClass",
+ desc="A test data schema",
+ _pkey=["index1", "index2"] )
+ # add properties
+ _schema.addProperty( "index1",
+ SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.addProperty( "index2",
+ SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # add method
+ _meth = SchemaMethod( _desc="A test method" )
+ _meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) )
+
+ _schema.addMethod( "meth_3", _meth )
+
+ # Add schema to Agent
+
+ self._agent.registerObjectClass(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ obj = QmfAgentData( self._agent, _schema )
+ obj.setProperty("index1", 100)
+ obj.setProperty("index2", "a name" )
+
+
+ self._agent.addObject( QmfAgentData( self._agent, _schema,
+ _props={"index1":99,
+ "index2": "another name"} ))
+
+
+ return None
+
+
+
+
+app = MyAgent()
+print( "s='%s'", str(app.main()))