summaryrefslogtreecommitdiff
path: root/qpid/python/qmf/qmfAgent.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qmf/qmfAgent.py')
-rw-r--r--qpid/python/qmf/qmfAgent.py303
1 files changed, 239 insertions, 64 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py
index 278eafedbc..89be722833 100644
--- a/qpid/python/qmf/qmfAgent.py
+++ b/qpid/python/qmf/qmfAgent.py
@@ -21,13 +21,56 @@ import sys
import logging
import datetime
import time
-from threading import Thread, Lock
+import Queue
+from threading import Thread, Lock, currentThread
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
makeSubject, parseSubject, OpCode, QmfQuery,
SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass)
+ SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
+
+# global flag that indicates which thread (if any) is
+# running the agent notifier callback
+_callback_thread=None
+
+ ##==============================================================================
+ ## METHOD CALL
+ ##==============================================================================
+
+class _MethodCallHandle(object):
+ """
+ Private class used to hold context when handing off a method call to the
+ application. Given to the app in a WorkItem, provided to the agent when
+ method_response() is invoked.
+ """
+ def __init__(self, correlation_id, reply_to, meth_name, _oid=None):
+ self.correlation_id = correlation_id
+ self.reply_to = reply_to
+ self.meth_name = meth_name
+ self.oid = _oid
+
+class MethodCallParams(object):
+ """
+ """
+ def __init__(self, name, _oid=None, _in_args=None, _user_id=None):
+ self._meth_name = name
+ self._oid = _oid
+ self._in_args = _in_args
+ self._user_id = _user_id
+
+ def get_name(self):
+ return self._meth_name
+
+ def get_object_id(self):
+ return self._oid
+
+ def get_args(self):
+ return self._in_args
+
+ def get_user_id(self):
+ return self._user_id
+
##==============================================================================
@@ -54,6 +97,8 @@ class Agent(Thread):
self._schema_timestamp = long(0)
self._schema = {}
self._agent_data = {}
+ self._work_q = Queue.Queue()
+ self._work_q_put = False
def get_name(self):
return self.name
@@ -63,9 +108,9 @@ class Agent(Thread):
locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- logging.debug("my direct addr=%s" % my_addr)
- logging.debug("agent.locate addr=%s" % locate_addr)
- logging.debug("agent.ind addr=%s" % ind_addr)
+ logging.error("my direct addr=%s" % my_addr)
+ logging.error("agent.locate addr=%s" % locate_addr)
+ logging.error("agent.ind addr=%s" % ind_addr)
self._conn = conn
self._session = self._conn.session()
@@ -133,30 +178,60 @@ class Agent(Thread):
self._lock.release()
- def methodResponse(self, context, status, text, arguments):
- logging.error("!!!Agent.methodResponse() TBD!!!")
+ def method_response(self, handle, _out_args=None, _error=None):
+ """
+ """
+ if not isinstance(handle, _MethodCallHandle):
+ raise TypeError("Invalid handle passed to method_response!")
+
+ _map = {SchemaMethod.KEY_NAME:handle.meth_name}
+ if handle.oid is not None:
+ _map[QmfData.KEY_OBJECT_ID] = handle.oid
+ if _out_args is not None:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
+ if _error is not None:
+ if not isinstance(_error, QmfData):
+ raise TypeError("Invalid type for error - must be QmfData")
+ _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
+
+ msg = Message(subject=makeSubject(OpCode.response),
+ properties={"method":"response"},
+ content={MsgKey.method:_map})
+ msg.correlation_id = handle.correlation_id
+
+ try:
+ tmp_snd = self._session.sender( handle.reply_to )
+ tmp_snd.send(msg)
+ logging.debug("method-response sent to [%s]" % handle.reply_to)
+ except SendError, e:
+ logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e)))
- def getWorkItemCount(self):
+ def get_workitem_count(self):
"""
Returns the count of pending WorkItems that can be retrieved.
"""
- logging.error("!!!Agent.getWorkItemCount() TBD!!!")
+ return self._work_q.qsize()
- def getNextWorkItem(self, timeout=None):
+ def get_next_workitem(self, timeout=None):
"""
Obtains the next pending work item, or None if none available.
"""
- logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+ try:
+ wi = self._work_q.get(True, timeout)
+ except Queue.Empty:
+ return None
+ return wi
- def releaseWorkItem(self, wi):
+ def release_workitem(self, wi):
"""
Releases a WorkItem instance obtained by getNextWorkItem(). Called when
the application has finished processing the WorkItem.
"""
- logging.error("!!!Agent.releaseWorkItem() TBD!!!")
+ pass
def run(self):
+ global _callback_thread
next_heartbeat = datetime.datetime.utcnow()
while self._running:
@@ -174,19 +249,29 @@ class Agent(Thread):
except Empty:
continue
- try:
- msg = self._locate_receiver.fetch(timeout = 0)
- except Empty:
- msg = None
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=False)
-
- try:
- msg = self._direct_receiver.fetch(timeout = 0)
- except Empty:
- msg = None
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=True)
+ while True:
+ try:
+ msg = self._locate_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=False)
+
+ while True:
+ try:
+ msg = self._direct_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=True)
+
+ if self._work_q_put and self._notifier:
+ # new stuff on work queue, kick the the application...
+ self._work_q_put = False
+ _callback_thread = currentThread()
+ logging.info("Calling agent notifier.indication")
+ self._notifier.indication()
+ _callback_thread = None
#
# Private:
@@ -227,7 +312,7 @@ class Agent(Thread):
elif opcode == OpCode.get_query:
self._handleQueryMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.method_req:
- logging.warning("!!! METHOD_REQ TBD !!!")
+ self._handleMethodReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.cancel_subscription:
logging.warning("!!! CANCEL_SUB TBD !!!")
elif opcode == OpCode.create_subscription:
@@ -253,7 +338,7 @@ class Agent(Thread):
query = cmap.get(MsgKey.query)
if query is not None:
# fake a QmfData containing my identifier for the query compare
- tmpData = QmfData(_values={"_name": self.get_name()})
+ tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
reply = QmfQuery(query).evaluate(tmpData)
if reply:
@@ -278,24 +363,53 @@ class Agent(Thread):
if "method" in props and props["method"] == "request":
qmap = cmap.get(MsgKey.query)
if qmap:
- query = QmfQuery(qmap)
- target = query.getTarget()
- if target == QmfQuery._TARGET_PACKAGES:
+ query = QmfQuery.from_map(qmap)
+ target = query.get_target()
+ if target == QmfQuery.TARGET_PACKAGES:
self._queryPackages( msg, query )
- elif target == QmfQuery._TARGET_SCHEMA_ID:
+ elif target == QmfQuery.TARGET_SCHEMA_ID:
self._querySchema( msg, query, _idOnly=True )
- elif target == QmfQuery._TARGET_SCHEMA:
+ elif target == QmfQuery.TARGET_SCHEMA:
self._querySchema( msg, query)
- elif target == QmfQuery._TARGET_AGENT:
- logging.warning("!!! Query TARGET=AGENT TBD !!!")
- elif target == QmfQuery._TARGET_OBJECT_ID:
- logging.warning("!!! Query TARGET=OBJECT_ID TBD !!!")
- elif target == QmfQuery._TARGET_OBJECT:
- logging.warning("!!! Query TARGET=OBJECT TBD !!!")
+ elif target == QmfQuery.TARGET_AGENT:
+ logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
+ elif target == QmfQuery.TARGET_OBJECT_ID:
+ self._queryData(msg, query, _idOnly=True)
+ elif target == QmfQuery.TARGET_OBJECT:
+ self._queryData(msg, query)
else:
logging.warning("Unrecognized query target: '%s'" % str(target))
+
+ def _handleMethodReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Method Request
+ """
+ if "method" in props and props["method"] == "request":
+ mname = cmap.get(SchemaMethod.KEY_NAME)
+ if not mname:
+ logging.warning("Invalid method call from '%s': no name"
+ % msg.reply_to)
+ return
+
+ in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
+ oid = cmap.get(QmfData.KEY_OBJECT_ID)
+
+ print("!!! ci=%s rt=%s mn=%s oid=%s" %
+ (msg.correlation_id,
+ msg.reply_to,
+ mname,
+ oid))
+
+ handle = _MethodCallHandle(msg.correlation_id,
+ msg.reply_to,
+ mname,
+ oid)
+ param = MethodCallParams( mname, oid, in_args, msg.user_id)
+ self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
+ self._work_q_put = True
+
def _queryPackages(self, msg, query):
"""
Run a query against the list of known packages
@@ -304,7 +418,7 @@ class Agent(Thread):
self._lock.acquire()
try:
for name in self._packages.iterkeys():
- if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})):
+ if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})):
pnames.append(name)
finally:
self._lock.release()
@@ -326,36 +440,97 @@ class Agent(Thread):
"""
"""
schemas = []
- self._lock.acquire()
- try:
- for sid,val in self._schema.iteritems():
- if query.evaluate(val):
- if _idOnly:
- schemas.append(sid.map_encode())
- else:
- schemas.append(val.map_encode())
- finally:
- self._lock.release()
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ found = None
+ self._lock.acquire()
+ try:
+ found = self._schema.get(query.get_id())
+ finally:
+ self._lock.release()
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
+ self._lock.acquire()
+ try:
+ for sid,val in self._schema.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ schemas.append(sid.map_encode())
+ else:
+ schemas.append(val.map_encode())
+ finally:
+ self._lock.release()
- try:
- tmp_snd = self._session.sender( msg.reply_to )
- if _idOnly:
- content = {MsgKey.schema_id: schemas}
- else:
- content = {MsgKey.schema:schemas}
+ tmp_snd = self._session.sender( msg.reply_to )
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content=content )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
+ if _idOnly:
+ content = {MsgKey.schema_id: schemas}
+ else:
+ content = {MsgKey.schema:schemas}
+
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content=content )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ try:
tmp_snd.send(m)
logging.debug("schema_id sent to [%s]" % msg.reply_to)
except SendError, e:
logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+ def _queryData( self, msg, query, _idOnly=False ):
+ """
+ """
+ data_objs = []
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ found = None
+ self._lock.acquire()
+ try:
+ found = self._agent_data.get(query.get_id())
+ finally:
+ self._lock.release()
+ if found:
+ if _idOnly:
+ data_objs.append(query.get_id())
+ else:
+ data_objs.append(found.map_encode())
+ else: # otherwise, evaluate all data
+ self._lock.acquire()
+ try:
+ for oid,val in self._agent_data.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(val.map_encode())
+ finally:
+ self._lock.release()
+
+ tmp_snd = self._session.sender( msg.reply_to )
+
+ if _idOnly:
+ content = {MsgKey.object_id:data_objs}
+ else:
+ content = {MsgKey.data_obj:data_objs}
+
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content=content )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ try:
+ tmp_snd.send(m)
+ logging.debug("data reply sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
##==============================================================================
@@ -407,7 +582,7 @@ class QmfAgentData(QmfData):
if __name__ == '__main__':
# static test cases - no message passing, just exercise API
- from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes,
+ from qmfCommon import (AgentName, SchemaProperty, qmfTypes,
SchemaMethod, SchemaEventClass)
logging.getLogger().setLevel(logging.INFO)
@@ -436,8 +611,8 @@ if __name__ == '__main__':
# add method
_meth = SchemaMethod(_desc="Method to set string and int in object." )
- _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
_schema.add_method( "set_meth", _meth )
# Add schema to Agent