summaryrefslogtreecommitdiff
path: root/qpid/python/qmf/qmfAgent.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qmf/qmfAgent.py')
-rw-r--r--qpid/python/qmf/qmfAgent.py290
1 files changed, 192 insertions, 98 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py
index 4b76263903..278eafedbc 100644
--- a/qpid/python/qmf/qmfAgent.py
+++ b/qpid/python/qmf/qmfAgent.py
@@ -24,11 +24,10 @@ import time
from threading import Thread, Lock
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
-from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE,
- AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject,
- parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey,
- QmfData)
-
+from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+ makeSubject, parseSubject, OpCode, QmfQuery,
+ SchemaObjectClass, MsgKey, QmfData, QmfAddress,
+ SchemaClass)
##==============================================================================
@@ -36,21 +35,18 @@ from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE,
##==============================================================================
class Agent(Thread):
- def __init__(self, vendor, product, name=None,
- notifier=None, heartbeat_interval=30,
- kwargs={}):
+ def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
+ _max_msg_size=0, _capacity=10):
Thread.__init__(self)
self._running = False
- self.vendor = vendor
- self.product = product
- if name:
- self.name = name
- else:
- self.name = uuid4().get_urn().split(":")[2]
- self._id = AgentId(self.vendor, self.product, self.name)
- self._address = str(self._id)
- self._notifier = notifier
- self._heartbeat_interval = heartbeat_interval
+
+ self.name = str(name)
+ self._domain = _domain
+ self._notifier = _notifier
+ self._heartbeat_interval = _heartbeat_interval
+ self._max_msg_size = _max_msg_size
+ self._capacity = _capacity
+
self._conn = None
self._session = None
self._lock = Lock()
@@ -59,33 +55,48 @@ class Agent(Thread):
self._schema = {}
self._agent_data = {}
- def getAgentId(self):
- return AgentId(self.vendor, self.product, self.name)
+ def get_name(self):
+ return self.name
def setConnection(self, conn):
+ my_addr = QmfAddress.direct(self.name, self._domain)
+ locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+ ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+
+ logging.debug("my direct addr=%s" % my_addr)
+ logging.debug("agent.locate addr=%s" % locate_addr)
+ logging.debug("agent.ind addr=%s" % ind_addr)
+
self._conn = conn
self._session = self._conn.session()
- self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE, capacity=10)
- self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address,
- capacity=10)
- self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION)
+ self._direct_receiver = self._session.receiver(str(my_addr) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic, x-properties: {type:direct}}}",
+ capacity=self._capacity)
+ self._locate_receiver = self._session.receiver(str(locate_addr) +
+ ";{create:always, node-properties:{type:topic}}",
+ capacity=self._capacity)
+ self._ind_sender = self._session.sender(str(ind_addr) +
+ ";{create:always, node-properties:{type:topic}}")
+
self._running = True
self.start()
- def registerObjectClass(self, schema):
+ def register_object_class(self, schema):
"""
- Register an instance of a SchemaObjectClass with this agent
+ Register an instance of a SchemaClass with this agent
"""
# @todo: need to update subscriptions
# @todo: need to mark schema as "non-const"
- if not isinstance(schema, SchemaObjectClass):
- raise TypeError("SchemaObjectClass instance expected")
+ if not isinstance(schema, SchemaClass):
+ raise TypeError("SchemaClass instance expected")
self._lock.acquire()
try:
- classId = schema.getClassId()
- pname = classId.getPackageName()
- cname = classId.getClassName()
+ classId = schema.get_class_id()
+ pname = classId.get_package_name()
+ cname = classId.get_class_name()
if pname not in self._packages:
self._packages[pname] = [cname]
else:
@@ -96,14 +107,13 @@ class Agent(Thread):
finally:
self._lock.release()
-
- def registerEventClass(self, cls):
- logging.error("!!!Agent.registerEventClass() TBD!!!")
+ def register_event_class(self, schema):
+ return self.register_object_class(schema)
def raiseEvent(self, qmfEvent):
logging.error("!!!Agent.raiseEvent() TBD!!!")
- def addObject(self, data ):
+ def add_object(self, data ):
"""
Register an instance of a QmfAgentData object.
"""
@@ -112,9 +122,13 @@ class Agent(Thread):
if not isinstance(data, QmfAgentData):
raise TypeError("QmfAgentData instance expected")
+ id_ = data.get_object_id()
+ if not id_:
+ raise TypeError("No identifier assigned to QmfAgentData!")
+
self._lock.acquire()
try:
- self._agent_data[data.getObjectId()] = data
+ self._agent_data[id_] = data
finally:
self._lock.release()
@@ -147,37 +161,32 @@ class Agent(Thread):
while self._running:
now = datetime.datetime.utcnow()
- print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
+ # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
if now >= next_heartbeat:
self._ind_sender.send(self._makeAgentIndMsg())
logging.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds
- print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
+ # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
try:
- logging.error("waiting for next rcvr (timeout=%s)..." % timeout)
- self._session.next_receiver(timeout = timeout)
+ self._session.next_receiver(timeout=timeout)
except Empty:
- pass
- except KeyboardInterrupt:
- break
+ continue
try:
msg = self._locate_receiver.fetch(timeout = 0)
- if msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=False)
except Empty:
- pass
+ msg = None
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=False)
try:
msg = self._direct_receiver.fetch(timeout = 0)
- if msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=True)
except Empty:
- pass
-
-
+ msg = None
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=True)
#
# Private:
@@ -187,8 +196,8 @@ class Agent(Thread):
"""
Create an agent indication message identifying this agent
"""
- _map = self.getAgentId().mapEncode()
- _map["schemaTimestamp"] = self._schema_timestamp
+ _map = {"_name": self.get_name(),
+ "_schema_timestamp": self._schema_timestamp}
return Message( subject=makeSubject(OpCode.agent_ind),
properties={"method":"response"},
content={MsgKey.agent_info: _map})
@@ -241,9 +250,11 @@ class Agent(Thread):
reply = True
if "method" in props and props["method"] == "request":
- if MsgKey.query in cmap:
- agentIdMap = self.getAgentId().mapEncode()
- reply = QmfQuery(cmap[MsgKey.query]).evaluate(QmfData(agentIdMap))
+ query = cmap.get(MsgKey.query)
+ if query is not None:
+ # fake a QmfData containing my identifier for the query compare
+ tmpData = QmfData(_values={"_name": self.get_name()})
+ reply = QmfQuery(query).evaluate(tmpData)
if reply:
try:
@@ -272,10 +283,9 @@ class Agent(Thread):
if target == QmfQuery._TARGET_PACKAGES:
self._queryPackages( msg, query )
elif target == QmfQuery._TARGET_SCHEMA_ID:
- self._querySchemaId( msg, query )
+ self._querySchema( msg, query, _idOnly=True )
elif target == QmfQuery._TARGET_SCHEMA:
- logging.warning("!!! Query TARGET=SCHEMA TBD !!!")
- #self._querySchema( query.getPredicate(), _idOnly=False )
+ self._querySchema( msg, query)
elif target == QmfQuery._TARGET_AGENT:
logging.warning("!!! Query TARGET=AGENT TBD !!!")
elif target == QmfQuery._TARGET_OBJECT_ID:
@@ -294,7 +304,7 @@ class Agent(Thread):
self._lock.acquire()
try:
for name in self._packages.iterkeys():
- if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:name})):
+ if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})):
pnames.append(name)
finally:
self._lock.release()
@@ -312,23 +322,32 @@ class Agent(Thread):
logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
- def _querySchemaId( self, msg, query ):
+ def _querySchema( self, msg, query, _idOnly=False ):
"""
"""
schemas = []
self._lock.acquire()
try:
- for schemaId in self._schema.iterkeys():
- if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:schemaId.getPackageName()})):
- schemas.append(schemaId.mapEncode())
+ for sid,val in self._schema.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ schemas.append(sid.map_encode())
+ else:
+ schemas.append(val.map_encode())
finally:
self._lock.release()
try:
tmp_snd = self._session.sender( msg.reply_to )
+
+ if _idOnly:
+ content = {MsgKey.schema_id: schemas}
+ else:
+ content = {MsgKey.schema:schemas}
+
m = Message( subject=makeSubject(OpCode.data_ind),
properties={"method":"response"},
- content={MsgKey.schema_id: schemas} )
+ content=content )
if msg.correlation_id != None:
m.correlation_id = msg.correlation_id
tmp_snd.send(m)
@@ -340,35 +359,45 @@ class Agent(Thread):
##==============================================================================
- ## OBJECTS
+ ## DATA MODEL
##==============================================================================
-class QmfAgentData(QmfManaged):
+class QmfAgentData(QmfData):
"""
A managed data object that is owned by an agent.
"""
- def __init__(self, _agent, _schema, _props={}):
- """
- @type _agent: class Agent
- @param _agent: the agent that manages this object.
- @type _schema: class SchemaObjectClass
- @param _schema: the schema used to describe this data object
- @type _props: map of "name"=<value> pairs
- @param _props: initial values for all properties in this object
- """
- super(QmfAgentData, self).__init__(_agentId=_agent.getAgentId(),
- _schema=_schema,
- _props=_props)
+
+ def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
+ _tag=_tag, _ctime=ctime,
+ _utime=ctime, _object_id=_object_id,
+ _schema=_schema, _const=False)
+ self._agent = agent
def destroy(self):
- self._timestamps[QmfManaged._ts_delete] = long(time.time() * 1000)
+ self._dtime = long(time.time() * 1000)
# @todo: publish change
- def setProperty( self, _name, _value):
- super(QmfAgentData, self).setProperty(_name, _value)
+ def is_deleted(self):
+ return self._dtime == 0
+
+ def set_value(self, _name, _value, _subType=None):
+ super(QmfAgentData, self).set_value(_name, _value, _subType)
# @todo: publish change
+ def inc_value(self, name, delta):
+ """ add the delta to the property """
+ # @todo: need to take write-lock
+ logging.error(" TBD!!!")
+
+ def dec_value(self, name, delta):
+ """ subtract the delta from the property """
+ # @todo: need to take write-lock
+ logging.error(" TBD!!!")
################################################################################
@@ -377,22 +406,87 @@ class QmfAgentData(QmfManaged):
################################################################################
if __name__ == '__main__':
- import time
+ # static test cases - no message passing, just exercise API
+ from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes,
+ SchemaMethod, SchemaEventClass)
+
logging.getLogger().setLevel(logging.INFO)
- logging.info( "Starting Connection" )
- _c = Connection("localhost")
- _c.connect()
- #c.start()
- logging.info( "Starting Agent" )
- _agent = Agent("redhat.com", "agent", "tross")
- _agent.setConnection(_c)
+ logging.info( "Create an Agent" )
+ _agent_name = AgentName("redhat.com", "agent", "tross")
+ _agent = Agent(str(_agent_name))
- logging.info( "Running Agent" )
-
- while True:
- try:
- time.sleep(10)
- except KeyboardInterrupt:
- break
-
+ logging.info( "Get agent name: '%s'" % _agent.get_name())
+
+ logging.info( "Create SchemaObjectClass" )
+
+ _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"])
+ # add properties
+ _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ # These two properties can be set via the method call
+ _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # add method
+ _meth = SchemaMethod(_desc="Method to set string and int in object." )
+ _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ print("Schema Map='%s'" % str(_schema.map_encode()))
+
+ _agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ logging.info( "Create QmfAgentData" )
+
+ _obj = QmfAgentData( _agent, _schema=_schema )
+ _obj.set_value("index1", 100)
+ _obj.set_value("index2", "a name" )
+ _obj.set_value("set_string", "UNSET")
+ _obj.set_value("set_int", 0)
+ _obj.set_value("query_count", 0)
+ _obj.set_value("method_call_count", 0)
+
+ print("Obj1 Map='%s'" % str(_obj.map_encode()))
+
+ _agent.add_object( _obj )
+
+ _obj = QmfAgentData( _agent,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0},
+ _schema=_schema)
+
+ print("Obj2 Map='%s'" % str(_obj.map_encode()))
+
+ _agent.add_object(_obj)
+
+ ##############
+
+
+
+ logging.info( "Create SchemaEventClass" )
+
+ _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A test data schema",
+ _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)})
+ _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ print("Event Map='%s'" % str(_event.map_encode()))
+
+ _agent.register_event_class(_event)