diff options
Diffstat (limited to 'qpid/python/qmf/qmfAgent.py')
-rw-r--r-- | qpid/python/qmf/qmfAgent.py | 290 |
1 files changed, 192 insertions, 98 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index 4b76263903..278eafedbc 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -24,11 +24,10 @@ import time from threading import Thread, Lock from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 -from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, - AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject, - parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, - QmfData) - +from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, + makeSubject, parseSubject, OpCode, QmfQuery, + SchemaObjectClass, MsgKey, QmfData, QmfAddress, + SchemaClass) ##============================================================================== @@ -36,21 +35,18 @@ from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, ##============================================================================== class Agent(Thread): - def __init__(self, vendor, product, name=None, - notifier=None, heartbeat_interval=30, - kwargs={}): + def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, + _max_msg_size=0, _capacity=10): Thread.__init__(self) self._running = False - self.vendor = vendor - self.product = product - if name: - self.name = name - else: - self.name = uuid4().get_urn().split(":")[2] - self._id = AgentId(self.vendor, self.product, self.name) - self._address = str(self._id) - self._notifier = notifier - self._heartbeat_interval = heartbeat_interval + + self.name = str(name) + self._domain = _domain + self._notifier = _notifier + self._heartbeat_interval = _heartbeat_interval + self._max_msg_size = _max_msg_size + self._capacity = _capacity + self._conn = None self._session = None self._lock = Lock() @@ -59,33 +55,48 @@ class Agent(Thread): self._schema = {} self._agent_data = {} - def getAgentId(self): - return AgentId(self.vendor, self.product, self.name) + def get_name(self): + return self.name def setConnection(self, conn): + my_addr = QmfAddress.direct(self.name, self._domain) + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + + logging.debug("my direct addr=%s" % my_addr) + logging.debug("agent.locate addr=%s" % locate_addr) + logging.debug("agent.ind addr=%s" % ind_addr) + self._conn = conn self._session = self._conn.session() - self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE, capacity=10) - self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address, - capacity=10) - self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION) + self._direct_receiver = self._session.receiver(str(my_addr) + + ";{create:always," + " node-properties:" + " {type:topic, x-properties: {type:direct}}}", + capacity=self._capacity) + self._locate_receiver = self._session.receiver(str(locate_addr) + + ";{create:always, node-properties:{type:topic}}", + capacity=self._capacity) + self._ind_sender = self._session.sender(str(ind_addr) + + ";{create:always, node-properties:{type:topic}}") + self._running = True self.start() - def registerObjectClass(self, schema): + def register_object_class(self, schema): """ - Register an instance of a SchemaObjectClass with this agent + Register an instance of a SchemaClass with this agent """ # @todo: need to update subscriptions # @todo: need to mark schema as "non-const" - if not isinstance(schema, SchemaObjectClass): - raise TypeError("SchemaObjectClass instance expected") + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass instance expected") self._lock.acquire() try: - classId = schema.getClassId() - pname = classId.getPackageName() - cname = classId.getClassName() + classId = schema.get_class_id() + pname = classId.get_package_name() + cname = classId.get_class_name() if pname not in self._packages: self._packages[pname] = [cname] else: @@ -96,14 +107,13 @@ class Agent(Thread): finally: self._lock.release() - - def registerEventClass(self, cls): - logging.error("!!!Agent.registerEventClass() TBD!!!") + def register_event_class(self, schema): + return self.register_object_class(schema) def raiseEvent(self, qmfEvent): logging.error("!!!Agent.raiseEvent() TBD!!!") - def addObject(self, data ): + def add_object(self, data ): """ Register an instance of a QmfAgentData object. """ @@ -112,9 +122,13 @@ class Agent(Thread): if not isinstance(data, QmfAgentData): raise TypeError("QmfAgentData instance expected") + id_ = data.get_object_id() + if not id_: + raise TypeError("No identifier assigned to QmfAgentData!") + self._lock.acquire() try: - self._agent_data[data.getObjectId()] = data + self._agent_data[id_] = data finally: self._lock.release() @@ -147,37 +161,32 @@ class Agent(Thread): while self._running: now = datetime.datetime.utcnow() - print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) + # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) if now >= next_heartbeat: self._ind_sender.send(self._makeAgentIndMsg()) logging.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds - print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) + # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) try: - logging.error("waiting for next rcvr (timeout=%s)..." % timeout) - self._session.next_receiver(timeout = timeout) + self._session.next_receiver(timeout=timeout) except Empty: - pass - except KeyboardInterrupt: - break + continue try: msg = self._locate_receiver.fetch(timeout = 0) - if msg.content_type == "amqp/map": - self._dispatch(msg, _direct=False) except Empty: - pass + msg = None + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=False) try: msg = self._direct_receiver.fetch(timeout = 0) - if msg.content_type == "amqp/map": - self._dispatch(msg, _direct=True) except Empty: - pass - - + msg = None + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=True) # # Private: @@ -187,8 +196,8 @@ class Agent(Thread): """ Create an agent indication message identifying this agent """ - _map = self.getAgentId().mapEncode() - _map["schemaTimestamp"] = self._schema_timestamp + _map = {"_name": self.get_name(), + "_schema_timestamp": self._schema_timestamp} return Message( subject=makeSubject(OpCode.agent_ind), properties={"method":"response"}, content={MsgKey.agent_info: _map}) @@ -241,9 +250,11 @@ class Agent(Thread): reply = True if "method" in props and props["method"] == "request": - if MsgKey.query in cmap: - agentIdMap = self.getAgentId().mapEncode() - reply = QmfQuery(cmap[MsgKey.query]).evaluate(QmfData(agentIdMap)) + query = cmap.get(MsgKey.query) + if query is not None: + # fake a QmfData containing my identifier for the query compare + tmpData = QmfData(_values={"_name": self.get_name()}) + reply = QmfQuery(query).evaluate(tmpData) if reply: try: @@ -272,10 +283,9 @@ class Agent(Thread): if target == QmfQuery._TARGET_PACKAGES: self._queryPackages( msg, query ) elif target == QmfQuery._TARGET_SCHEMA_ID: - self._querySchemaId( msg, query ) + self._querySchema( msg, query, _idOnly=True ) elif target == QmfQuery._TARGET_SCHEMA: - logging.warning("!!! Query TARGET=SCHEMA TBD !!!") - #self._querySchema( query.getPredicate(), _idOnly=False ) + self._querySchema( msg, query) elif target == QmfQuery._TARGET_AGENT: logging.warning("!!! Query TARGET=AGENT TBD !!!") elif target == QmfQuery._TARGET_OBJECT_ID: @@ -294,7 +304,7 @@ class Agent(Thread): self._lock.acquire() try: for name in self._packages.iterkeys(): - if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:name})): + if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})): pnames.append(name) finally: self._lock.release() @@ -312,23 +322,32 @@ class Agent(Thread): logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) - def _querySchemaId( self, msg, query ): + def _querySchema( self, msg, query, _idOnly=False ): """ """ schemas = [] self._lock.acquire() try: - for schemaId in self._schema.iterkeys(): - if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:schemaId.getPackageName()})): - schemas.append(schemaId.mapEncode()) + for sid,val in self._schema.iteritems(): + if query.evaluate(val): + if _idOnly: + schemas.append(sid.map_encode()) + else: + schemas.append(val.map_encode()) finally: self._lock.release() try: tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.schema_id: schemas} + else: + content = {MsgKey.schema:schemas} + m = Message( subject=makeSubject(OpCode.data_ind), properties={"method":"response"}, - content={MsgKey.schema_id: schemas} ) + content=content ) if msg.correlation_id != None: m.correlation_id = msg.correlation_id tmp_snd.send(m) @@ -340,35 +359,45 @@ class Agent(Thread): ##============================================================================== - ## OBJECTS + ## DATA MODEL ##============================================================================== -class QmfAgentData(QmfManaged): +class QmfAgentData(QmfData): """ A managed data object that is owned by an agent. """ - def __init__(self, _agent, _schema, _props={}): - """ - @type _agent: class Agent - @param _agent: the agent that manages this object. - @type _schema: class SchemaObjectClass - @param _schema: the schema used to describe this data object - @type _props: map of "name"=<value> pairs - @param _props: initial values for all properties in this object - """ - super(QmfAgentData, self).__init__(_agentId=_agent.getAgentId(), - _schema=_schema, - _props=_props) + + def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes, + _tag=_tag, _ctime=ctime, + _utime=ctime, _object_id=_object_id, + _schema=_schema, _const=False) + self._agent = agent def destroy(self): - self._timestamps[QmfManaged._ts_delete] = long(time.time() * 1000) + self._dtime = long(time.time() * 1000) # @todo: publish change - def setProperty( self, _name, _value): - super(QmfAgentData, self).setProperty(_name, _value) + def is_deleted(self): + return self._dtime == 0 + + def set_value(self, _name, _value, _subType=None): + super(QmfAgentData, self).set_value(_name, _value, _subType) # @todo: publish change + def inc_value(self, name, delta): + """ add the delta to the property """ + # @todo: need to take write-lock + logging.error(" TBD!!!") + + def dec_value(self, name, delta): + """ subtract the delta from the property """ + # @todo: need to take write-lock + logging.error(" TBD!!!") ################################################################################ @@ -377,22 +406,87 @@ class QmfAgentData(QmfManaged): ################################################################################ if __name__ == '__main__': - import time + # static test cases - no message passing, just exercise API + from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes, + SchemaMethod, SchemaEventClass) + logging.getLogger().setLevel(logging.INFO) - logging.info( "Starting Connection" ) - _c = Connection("localhost") - _c.connect() - #c.start() - logging.info( "Starting Agent" ) - _agent = Agent("redhat.com", "agent", "tross") - _agent.setConnection(_c) + logging.info( "Create an Agent" ) + _agent_name = AgentName("redhat.com", "agent", "tross") + _agent = Agent(str(_agent_name)) - logging.info( "Running Agent" ) - - while True: - try: - time.sleep(10) - except KeyboardInterrupt: - break - + logging.info( "Get agent name: '%s'" % _agent.get_name()) + + logging.info( "Create SchemaObjectClass" ) + + _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"]) + # add properties + _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + # These two properties can be set via the method call + _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod(_desc="Method to set string and int in object." ) + _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + print("Schema Map='%s'" % str(_schema.map_encode())) + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + logging.info( "Create QmfAgentData" ) + + _obj = QmfAgentData( _agent, _schema=_schema ) + _obj.set_value("index1", 100) + _obj.set_value("index2", "a name" ) + _obj.set_value("set_string", "UNSET") + _obj.set_value("set_int", 0) + _obj.set_value("query_count", 0) + _obj.set_value("method_call_count", 0) + + print("Obj1 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object( _obj ) + + _obj = QmfAgentData( _agent, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0}, + _schema=_schema) + + print("Obj2 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object(_obj) + + ############## + + + + logging.info( "Create SchemaEventClass" ) + + _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test data schema", + _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)}) + _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + print("Event Map='%s'" % str(_event.map_encode())) + + _agent.register_event_class(_event) |