diff options
Diffstat (limited to 'qpid/python/qmf/qmfAgent.py')
-rw-r--r-- | qpid/python/qmf/qmfAgent.py | 303 |
1 files changed, 239 insertions, 64 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index 278eafedbc..89be722833 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -21,13 +21,56 @@ import sys import logging import datetime import time -from threading import Thread, Lock +import Queue +from threading import Thread, Lock, currentThread from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, makeSubject, parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass) + SchemaClass, SchemaClassId, WorkItem, SchemaMethod) + +# global flag that indicates which thread (if any) is +# running the agent notifier callback +_callback_thread=None + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class _MethodCallHandle(object): + """ + Private class used to hold context when handing off a method call to the + application. Given to the app in a WorkItem, provided to the agent when + method_response() is invoked. + """ + def __init__(self, correlation_id, reply_to, meth_name, _oid=None): + self.correlation_id = correlation_id + self.reply_to = reply_to + self.meth_name = meth_name + self.oid = _oid + +class MethodCallParams(object): + """ + """ + def __init__(self, name, _oid=None, _in_args=None, _user_id=None): + self._meth_name = name + self._oid = _oid + self._in_args = _in_args + self._user_id = _user_id + + def get_name(self): + return self._meth_name + + def get_object_id(self): + return self._oid + + def get_args(self): + return self._in_args + + def get_user_id(self): + return self._user_id + ##============================================================================== @@ -54,6 +97,8 @@ class Agent(Thread): self._schema_timestamp = long(0) self._schema = {} self._agent_data = {} + self._work_q = Queue.Queue() + self._work_q_put = False def get_name(self): return self.name @@ -63,9 +108,9 @@ class Agent(Thread): locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - logging.debug("my direct addr=%s" % my_addr) - logging.debug("agent.locate addr=%s" % locate_addr) - logging.debug("agent.ind addr=%s" % ind_addr) + logging.error("my direct addr=%s" % my_addr) + logging.error("agent.locate addr=%s" % locate_addr) + logging.error("agent.ind addr=%s" % ind_addr) self._conn = conn self._session = self._conn.session() @@ -133,30 +178,60 @@ class Agent(Thread): self._lock.release() - def methodResponse(self, context, status, text, arguments): - logging.error("!!!Agent.methodResponse() TBD!!!") + def method_response(self, handle, _out_args=None, _error=None): + """ + """ + if not isinstance(handle, _MethodCallHandle): + raise TypeError("Invalid handle passed to method_response!") + + _map = {SchemaMethod.KEY_NAME:handle.meth_name} + if handle.oid is not None: + _map[QmfData.KEY_OBJECT_ID] = handle.oid + if _out_args is not None: + _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy() + if _error is not None: + if not isinstance(_error, QmfData): + raise TypeError("Invalid type for error - must be QmfData") + _map[SchemaMethod.KEY_ERROR] = _error.map_encode() + + msg = Message(subject=makeSubject(OpCode.response), + properties={"method":"response"}, + content={MsgKey.method:_map}) + msg.correlation_id = handle.correlation_id + + try: + tmp_snd = self._session.sender( handle.reply_to ) + tmp_snd.send(msg) + logging.debug("method-response sent to [%s]" % handle.reply_to) + except SendError, e: + logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e))) - def getWorkItemCount(self): + def get_workitem_count(self): """ Returns the count of pending WorkItems that can be retrieved. """ - logging.error("!!!Agent.getWorkItemCount() TBD!!!") + return self._work_q.qsize() - def getNextWorkItem(self, timeout=None): + def get_next_workitem(self, timeout=None): """ Obtains the next pending work item, or None if none available. """ - logging.error("!!!Agent.getNextWorkItem() TBD!!!") + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi - def releaseWorkItem(self, wi): + def release_workitem(self, wi): """ Releases a WorkItem instance obtained by getNextWorkItem(). Called when the application has finished processing the WorkItem. """ - logging.error("!!!Agent.releaseWorkItem() TBD!!!") + pass def run(self): + global _callback_thread next_heartbeat = datetime.datetime.utcnow() while self._running: @@ -174,19 +249,29 @@ class Agent(Thread): except Empty: continue - try: - msg = self._locate_receiver.fetch(timeout = 0) - except Empty: - msg = None - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=False) - - try: - msg = self._direct_receiver.fetch(timeout = 0) - except Empty: - msg = None - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=True) + while True: + try: + msg = self._locate_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=False) + + while True: + try: + msg = self._direct_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=True) + + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + logging.info("Calling agent notifier.indication") + self._notifier.indication() + _callback_thread = None # # Private: @@ -227,7 +312,7 @@ class Agent(Thread): elif opcode == OpCode.get_query: self._handleQueryMsg( msg, cmap, props, version, _direct ) elif opcode == OpCode.method_req: - logging.warning("!!! METHOD_REQ TBD !!!") + self._handleMethodReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.cancel_subscription: logging.warning("!!! CANCEL_SUB TBD !!!") elif opcode == OpCode.create_subscription: @@ -253,7 +338,7 @@ class Agent(Thread): query = cmap.get(MsgKey.query) if query is not None: # fake a QmfData containing my identifier for the query compare - tmpData = QmfData(_values={"_name": self.get_name()}) + tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) reply = QmfQuery(query).evaluate(tmpData) if reply: @@ -278,24 +363,53 @@ class Agent(Thread): if "method" in props and props["method"] == "request": qmap = cmap.get(MsgKey.query) if qmap: - query = QmfQuery(qmap) - target = query.getTarget() - if target == QmfQuery._TARGET_PACKAGES: + query = QmfQuery.from_map(qmap) + target = query.get_target() + if target == QmfQuery.TARGET_PACKAGES: self._queryPackages( msg, query ) - elif target == QmfQuery._TARGET_SCHEMA_ID: + elif target == QmfQuery.TARGET_SCHEMA_ID: self._querySchema( msg, query, _idOnly=True ) - elif target == QmfQuery._TARGET_SCHEMA: + elif target == QmfQuery.TARGET_SCHEMA: self._querySchema( msg, query) - elif target == QmfQuery._TARGET_AGENT: - logging.warning("!!! Query TARGET=AGENT TBD !!!") - elif target == QmfQuery._TARGET_OBJECT_ID: - logging.warning("!!! Query TARGET=OBJECT_ID TBD !!!") - elif target == QmfQuery._TARGET_OBJECT: - logging.warning("!!! Query TARGET=OBJECT TBD !!!") + elif target == QmfQuery.TARGET_AGENT: + logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!") + elif target == QmfQuery.TARGET_OBJECT_ID: + self._queryData(msg, query, _idOnly=True) + elif target == QmfQuery.TARGET_OBJECT: + self._queryData(msg, query) else: logging.warning("Unrecognized query target: '%s'" % str(target)) + + def _handleMethodReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Method Request + """ + if "method" in props and props["method"] == "request": + mname = cmap.get(SchemaMethod.KEY_NAME) + if not mname: + logging.warning("Invalid method call from '%s': no name" + % msg.reply_to) + return + + in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) + oid = cmap.get(QmfData.KEY_OBJECT_ID) + + print("!!! ci=%s rt=%s mn=%s oid=%s" % + (msg.correlation_id, + msg.reply_to, + mname, + oid)) + + handle = _MethodCallHandle(msg.correlation_id, + msg.reply_to, + mname, + oid) + param = MethodCallParams( mname, oid, in_args, msg.user_id) + self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) + self._work_q_put = True + def _queryPackages(self, msg, query): """ Run a query against the list of known packages @@ -304,7 +418,7 @@ class Agent(Thread): self._lock.acquire() try: for name in self._packages.iterkeys(): - if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})): + if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})): pnames.append(name) finally: self._lock.release() @@ -326,36 +440,97 @@ class Agent(Thread): """ """ schemas = [] - self._lock.acquire() - try: - for sid,val in self._schema.iteritems(): - if query.evaluate(val): - if _idOnly: - schemas.append(sid.map_encode()) - else: - schemas.append(val.map_encode()) - finally: - self._lock.release() + # if querying for a specific schema, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._schema.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + schemas.append(query.get_id().map_encode()) + else: + schemas.append(found.map_encode()) + else: # otherwise, evaluate all schema + self._lock.acquire() + try: + for sid,val in self._schema.iteritems(): + if query.evaluate(val): + if _idOnly: + schemas.append(sid.map_encode()) + else: + schemas.append(val.map_encode()) + finally: + self._lock.release() - try: - tmp_snd = self._session.sender( msg.reply_to ) - if _idOnly: - content = {MsgKey.schema_id: schemas} - else: - content = {MsgKey.schema:schemas} + tmp_snd = self._session.sender( msg.reply_to ) - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content=content ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id + if _idOnly: + content = {MsgKey.schema_id: schemas} + else: + content = {MsgKey.schema:schemas} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: tmp_snd.send(m) logging.debug("schema_id sent to [%s]" % msg.reply_to) except SendError, e: logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + def _queryData( self, msg, query, _idOnly=False ): + """ + """ + data_objs = [] + # if querying for a specific object, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._agent_data.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + data_objs.append(query.get_id()) + else: + data_objs.append(found.map_encode()) + else: # otherwise, evaluate all data + self._lock.acquire() + try: + for oid,val in self._agent_data.iteritems(): + if query.evaluate(val): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(val.map_encode()) + finally: + self._lock.release() + + tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.object_id:data_objs} + else: + content = {MsgKey.data_obj:data_objs} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: + tmp_snd.send(m) + logging.debug("data reply sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) ##============================================================================== @@ -407,7 +582,7 @@ class QmfAgentData(QmfData): if __name__ == '__main__': # static test cases - no message passing, just exercise API - from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes, + from qmfCommon import (AgentName, SchemaProperty, qmfTypes, SchemaMethod, SchemaEventClass) logging.getLogger().setLevel(logging.INFO) @@ -436,8 +611,8 @@ if __name__ == '__main__': # add method _meth = SchemaMethod(_desc="Method to set string and int in object." ) - _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) _schema.add_method( "set_meth", _meth ) # Add schema to Agent |