diff options
author | Ted Ross <tross@apache.org> | 2009-12-16 22:24:57 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-12-16 22:24:57 +0000 |
commit | 9fc81d4e597ae271632d55c87fc6a61ed3f62a49 (patch) | |
tree | 5232f7fb4302794b5b659776676f1d70c3af8139 | |
parent | b9ed41f57178248064be233ea42887e2e9eed497 (diff) | |
download | qpid-python-9fc81d4e597ae271632d55c87fc6a61ed3f62a49.tar.gz |
QPID-2261 - Applied patch from Ken Giusti
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@891456 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qmf/qmfAgent.py | 223 | ||||
-rw-r--r-- | qpid/python/qmf/qmfCommon.py | 154 | ||||
-rw-r--r-- | qpid/python/qmf/qmfConsole.py | 337 | ||||
-rw-r--r-- | qpid/python/qmf/test/agent_test.py | 104 | ||||
-rw-r--r-- | qpid/python/qmf/test/console_test.py | 67 |
5 files changed, 624 insertions, 261 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index 427d3e549d..f211cdb383 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -21,12 +21,12 @@ import sys import socket import os import logging -from threading import Thread +from threading import Thread, Lock from qpid.messaging import Connection, Message from uuid import uuid4 from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject, - parseSubject, OpCode) + parseSubject, OpCode, Query, SchemaObjectClass, _doQuery) @@ -49,6 +49,10 @@ class Agent(Thread): self._address = str(self._id) self._notifier = notifier self._conn = None + self._lock = Lock() + self._data_schema = {} + self._event_schema = {} + self._agent_data = {} def getAgentId(self): return AgentId(self.vendor, self.product, self.name) @@ -61,52 +65,66 @@ class Agent(Thread): self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION) self._running = True self.start() + + def registerObjectClass(self, schema): + """ + Register an instance of a SchemaObjectClass with this agent + """ + # @todo: need to update subscriptions + # @todo: need to mark schema as "non-const" + if not isinstance(schema, SchemaObjectClass): + raise TypeError("SchemaObjectClass instance expected") + self._lock.acquire() + try: + self._data_schema[schema.getClassId()] = schema + finally: + self._lock.release() - def _dispatch(self, msg, _direct=False): + + def registerEventClass(self, cls): + logging.error("!!!Agent.registerEventClass() TBD!!!") + + def raiseEvent(self, qmfEvent): + logging.error("!!!Agent.raiseEvent() TBD!!!") + + def addObject(self, data ): """ - @param _direct: True if msg directly addressed to this agent. + Register an instance of a QmfAgentData object. """ + # @todo: need to update subscriptions + # @todo: need to mark schema as "non-const" + if not isinstance(data, QmfAgentData): + raise TypeError("QmfAgentData instance expected") + + self._lock.acquire() try: - version,opcode = parseSubject(msg.subject) - except: - logging.debug("Ignoring unrecognized message '%s'" % msg.subject) - return + self._agent_data[data.getObjectId()] = data + finally: + self._lock.release() - cmap = {}; props={} - if msg.content_type == "amqp/map": - cmap = msg.content - if msg.properties: - props = msg.properties - if opcode == OpCode.agent_locate: - reply = False - if "method" in props and props["method"] == "request": - if "query" in cmap: - if self._doQuery(cmap["query"]): - reply=True - else: - reply=True - - if reply: - try: - tmp_snd = self._session.sender( msg.reply_to ) - m = Message( subject=makeSubject(OpCode.agent_locate), - properties={"method":"response"}, - content={"name": {"vendor":"redhat.com", - "product":"agent", - "name":"tross"}}, - correlation_id=msg.correlation_id) - tmp_snd.send(m) - logging.debug("reply-to [%s] sent" % msg.reply_to) - except e: - logging.error("Failed to send reply to msg '%s'" % str(e)) - else: - logging.debug("Ignoring invalid agent-locate msg") - else: - logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" - % opcode) + def methodResponse(self, context, status, text, arguments): + logging.error("!!!Agent.methodResponse() TBD!!!") + def getWorkItemCount(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + logging.error("!!!Agent.getWorkItemCount() TBD!!!") + + def getNextWorkItem(self, timeout=None): + """ + Obtains the next pending work item, or None if none available. + """ + logging.error("!!!Agent.getNextWorkItem() TBD!!!") + + def releaseWorkItem(self, wi): + """ + Releases a WorkItem instance obtained by getNextWorkItem(). Called when + the application has finished processing the WorkItem. + """ + logging.error("!!!Agent.releaseWorkItem() TBD!!!") def run(self): @@ -114,7 +132,7 @@ class Agent(Thread): while self._running: try: msg = self._locate_receiver.fetch(1) - logging.info("Agent Locate Rcvd: '%s'" % msg) + logging.debug("Agent Locate Rcvd: '%s'" % msg) if msg.content_type == "amqp/map": self._dispatch(msg, _direct=False) except KeyboardInterrupt: @@ -124,7 +142,7 @@ class Agent(Thread): try: msg = self._direct_receiver.fetch(1) - logging.info("Agent Msg Rcvd: '%s'" % msg) + logging.debug("Agent Msg Rcvd: '%s'" % msg) if msg.content_type == "amqp/map": self._dispatch(msg, _direct=True) except KeyboardInterrupt: @@ -132,61 +150,100 @@ class Agent(Thread): except: pass + # @todo: actually implement the periodic agent-ind + # message generation! count+= 1 if count == 5: count = 0 - m = Message( subject=makeSubject(OpCode.agent_ind), - properties={"method":"indication"}, - content={"name": {"vendor":"redhat.com", - "product":"agent", - "name":"tross"}} ) - self._ind_sender.send(m) - logging.info("Agent Indication Sent") - - - def registerObjectClass(self, cls): - logging.error("!!!Agent.registerObjectClass() TBD!!!") + self._ind_sender.send(self._makeAgentIndMsg()) + logging.debug("Agent Indication Sent") - def registerEventClass(self, cls): - logging.error("!!!Agent.registerEventClass() TBD!!!") + # + # Private: + # - def raiseEvent(self, qmfEvent): - logging.error("!!!Agent.raiseEvent() TBD!!!") + def _makeAgentIndMsg(self): + """ + Create an agent indication message identifying this agent + """ + return Message( subject=makeSubject(OpCode.agent_ind), + properties={"method":"response"}, + content={Query._TARGET_AGENT_ID: + self.getAgentId().mapEncode()}) - def addObject(self, qmfAgentData ): - logging.error("!!!Agent.addObject() TBD!!!") - def methodResponse(self, context, status, text, arguments): - logging.error("!!!Agent.methodResponse() TBD!!!") - def getWorkItemCount(self): - """ - Returns the count of pending WorkItems that can be retrieved. + def _dispatch(self, msg, _direct=False): """ - logging.error("!!!Agent.getWorkItemCount() TBD!!!") + Process a message from a console. - def getNextWorkItem(self, timeout=None): - """ - Obtains the next pending work item, or None if none available. + @param _direct: True if msg directly addressed to this agent. """ - logging.error("!!!Agent.getNextWorkItem() TBD!!!") + logging.error( "Message received from Console! [%s]" % msg ) + try: + version,opcode = parseSubject(msg.subject) + except: + logging.debug("Ignoring unrecognized message '%s'" % msg.subject) + return - def releaseWorkItem(self, wi): + cmap = {}; props={} + if msg.content_type == "amqp/map": + cmap = msg.content + if msg.properties: + props = msg.properties + + if opcode == OpCode.agent_locate: + self._handleAgentLocateMsg( msg, cmap, props, version, _direct ) + elif opcode == OpCode.get_query: + logging.warning("!!! GET_QUERY TBD !!!") + elif opcode == OpCode.method_req: + logging.warning("!!! METHOD_REQ TBD !!!") + elif opcode == OpCode.cancel_subscription: + logging.warning("!!! CANCEL_SUB TBD !!!") + elif opcode == OpCode.create_subscription: + logging.warning("!!! CREATE_SUB TBD !!!") + elif opcode == OpCode.renew_subscription: + logging.warning("!!! RENEW_SUB TBD !!!") + elif opcode == OpCode.schema_query: + logging.warning("!!! SCHEMA_QUERY TBD !!!") + elif opcode == OpCode.noop: + logging.debug("No-op msg received.") + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % opcode) + + def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ): """ - Releases a WorkItem instance obtained by getNextWorkItem(). Called when - the application has finished processing the WorkItem. + Process a received agent-locate message """ - logging.error("!!!Agent.releaseWorkItem() TBD!!!") + logging.debug("_handleAgentLocateMsg") + + reply = True + if "method" in props and props["method"] == "request": + if "query" in cmap: + query = cmap["query"] + # is the query an agent locate? + if Query._TARGET in query and query[Query._TARGET] == {Query._TARGET_AGENT_ID:None}: + if Query._PREDICATE in query: + # does this agent match the predicate? + reply = _doQuery( query[Query._PREDICATE], self.getAgentId().mapEncode() ) + else: + reply = False + logging.debug("Ignoring query - not an agent-id query: '%s'" % query) + reply=True + + if reply: + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = self._makeAgentIndMsg() + m.correlation_id = msg.correlation_id + tmp_snd.send(m) + logging.debug("agent-ind sent to [%s]" % msg.reply_to) + except: + logging.error("Failed to send reply to agent-ind msg '%s'" % msg) + else: + logging.debug("agent-locate msg not mine - no reply sent") - def _doQuery(self, query): - # query = cmap["query"] - # if ("vendor" in query and (query["vendor"] == "*" or query["vendor"] == self.vendor) and - # "product" in query and (query["product"] == "*" or query["product"] == self.product) and - # "name" in query and (query["name"] == "*" or query["name"] == self.name)): - # logging.debug("Query received for %s:%s:%s" % (self.vendor, self.product, self.name)) - # logging.debug("reply-to [%s], cid=%s" % (msg.reply_to, msg.correlation_id)) - logging.error("!!!Agent._doQuery() TBD!!!") - return True ##============================================================================== @@ -228,7 +285,7 @@ class QmfAgentData(QmfManaged): if __name__ == '__main__': import time - #logging.getLogger().setLevel(logging.INFO) + logging.getLogger().setLevel(logging.INFO) logging.info( "Starting Connection" ) _c = Connection("localhost") _c.connect() diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py index cfdc87806b..daa5cea2fd 100644 --- a/qpid/python/qmf/qmfCommon.py +++ b/qpid/python/qmf/qmfCommon.py @@ -48,9 +48,28 @@ AMQP_QMF_VERSION = 4 AMQP_QMF_SUBJECT_FMT = "%s%d.%s" class OpCode(object): + noop = "noop" + + # codes sent by a console and processed by the agent agent_locate = "agent-locate" + cancel_subscription = "cancel-subscription" + create_subscription = "create-subscription" + get_query = "get_query" + method_req = "method" + renew_subscription = "renew-subscription" + schema_query = "schema-query" # @todo: deprecate + + # codes sent by the agent to a console agent_ind = "agent" - noop = "noop" + data_ind = "data" + event_ind = "event" + managed_object = "managed-object" + object_ind = "object" + response = "response" + schema_ind="schema" # @todo: deprecate + + + def makeSubject(_code): """ @@ -69,6 +88,19 @@ def parseSubject(_sub): return _sub[3:].split('.', 1) +class Notifier(object): + """ + Virtual base class that defines a call back which alerts the application that + a QMF Console notification is pending. + """ + def indication(self): + """ + Called when one or more items are ready for the application to process. + This method may be called by an internal QMF library thread. Its purpose is to + indicate that the application should process pending work items. + """ + raise Exception("The indication method must be overridden by the application!") + ##============================================================================== @@ -966,7 +998,127 @@ class MethodResponse(object): # ##============================================================================== + +def _doQuery(predicate, params ): + """ + Given the predicate from a query, and a map of named parameters, apply the predicate + to the parameters, and return True or False. + """ + if type(predicate) != list or len(predicate) < 1: + return False + + opr = predicate[0] + if opr == Query._CMP_TRUE: + logging.info("_doQuery() TRUE: [%s]" % predicate ) + return True + elif opr == Query._CMP_FALSE: + logging.info("_doQuery() FALSE: [%s]" % predicate ) + return False + elif opr == Query._LOGIC_AND: + logging.info("_doQuery() AND: [%s]" % predicate ) + rc = False + for exp in predicate[1:]: + rc = _doQuery( exp, params ) + if not rc: + break + return rc + + elif opr == Query._LOGIC_OR: + logging.info("_doQuery() OR: [%s]" % predicate ) + rc = False + for exp in predicate[1:]: + rc = _doQuery( exp, params ) + if rc: + break + return rc + + elif opr == Query._LOGIC_NOT: + logging.info("_doQuery() NOT: [%s]" % predicate ) + if len(predicate) != 2: + logging.warning("Malformed query not-expression received: '%s'" % predicate) + return False + return not _doQuery( predicate[1:], params ) + + elif opr in [Query._CMP_EQ, Query._CMP_NE, Query._CMP_LT, + Query._CMP_LE, Query._CMP_GT, Query._CMP_GE, + Query._CMP_RE]: + if len(predicate) != 3: + logging.warning("Malformed query compare expression received: '%s'" % predicate) + return False + # @todo: support regular expression match + name = predicate[1] + if name not in params: + logging.warning("Malformed query, attribute '%s' not present." % name) + return False + arg1 = params[name] + arg1Type = type(arg1) + logging.info("_doQuery() CMP: [%s] value='%s'" % (predicate, arg1) ) + try: + arg2 = arg1Type(predicate[2]) + if opr == Query._CMP_EQ: return arg1 == arg2 + if opr == Query._CMP_NE: return arg1 != arg2 + if opr == Query._CMP_LT: return arg1 < arg2 + if opr == Query._CMP_LE: return arg1 <= arg2 + if opr == Query._CMP_GT: return arg1 > arg2 + if opr == Query._CMP_GE: return arg1 >= arg2 + if opr == Query._CMP_RE: + logging.error("!!! RE QUERY TBD !!!") + return False + except: + logging.warning("Malformed query, unable to compare '%s'" % predicate) + return False + + elif opr == Query._CMP_PRESENT: + logging.info("_doQuery() PRESENT: [%s]" % predicate ) + if len(predicate) != 2: + logging.warning("Malformed query present expression received: '%s'" % predicate) + return False + name = predicate[1] + return name in params + + else: + logging.warning("Unknown query operator received: '%s'" % opr) + return False + + + class Query: + _TARGET="what" + _PREDICATE="where" + + _TARGET_PACKAGES="_packages" + _TARGET_OBJECT_ID="_object_id" + _TARGET_SCHEMA="_schema" + _TARGET_SCHEMA_ID="_schema_id" + _TARGET_MGT_DATA="_mgt_data" + _TARGET_AGENT_ID="_agent_id" + + _PRED_PACKAGE="_package_name" + _PRED_CLASS="_class_name" + _PRED_TYPE="_type" + _PRED_HASH="_has_str" + _PRED_SCHEMA_ID="_schema_id" + _PRED_VENDOR="_vendor" + _PRED_PRODUCT="_product" + _PRED_NAME="_name" + _PRED_AGENT_ID="_agent_id" + _PRED_PRIMARY_KEY="_primary_key" + + _CMP_EQ="eq" + _CMP_NE="ne" + _CMP_LT="lt" + _CMP_LE="le" + _CMP_GT="gt" + _CMP_GE="ge" + _CMP_RE="re_match" + _CMP_PRESENT="exists" + _CMP_TRUE="true" + _CMP_FALSE="false" + + _LOGIC_AND="and" + _LOGIC_OR="or" + _LOGIC_NOT="not" + def __init__(self, kwargs={}): pass # if "impl" in kwargs: diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py index 24f263adab..e956b07dab 100644 --- a/qpid/python/qmf/qmfConsole.py +++ b/qpid/python/qmf/qmfConsole.py @@ -30,7 +30,8 @@ from threading import Condition from qpid.messaging import * from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION, - AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode) + AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode, + Query, AgentIdFactory, Notifier, _doQuery) @@ -67,9 +68,8 @@ class _Mailbox(object): def fetch(self, timeout=None): self._cv.acquire() try: - if len(self._msgs): - return self._msgs.pop() - self._cv.wait(timeout) + if len(self._msgs) == 0: + self._cv.wait(timeout) if len(self._msgs): return self._msgs.pop() return None @@ -169,6 +169,19 @@ class SequencedWaiter(object): self.lock.release() + def isValid(self, seq): + """ + True if seq is in use, else False (seq is unknown) + """ + seq = long(seq) + self.lock.acquire() + try: + return seq in self.pending + finally: + self.lock.release() + return False + + #class ObjectProxy(QmfObject): class ObjectProxy(object): @@ -198,11 +211,11 @@ class ObjectProxy(object): """ if not self._agent: raise Exception("No Agent associated with this object") - newer = self._agent.get_object(Query({"object_id":object_id}), timeout) + newer = self._agent.get_object(Query({"object_id":None}), timeout) if newer == None: logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) - self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class + #self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class ### def _merge_update(self, newerObject): ### ??? in Rafi's console.py::Object Class @@ -216,32 +229,36 @@ class ObjectProxy(object): -class AgentProxy(object): +class Agent(object): """ A local representation of a remote agent managed by this console. """ - def __init__(self, name): + def __init__(self, agent_id, console): """ @type name: AgentId @param name: uniquely identifies this agent in the AMQP domain. """ - if not name or not isinstance(name, AgentId): - raise Exception( "Attempt to create an Agent without supplying a valid agent name." ); - - self._name = name - self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(name) - self._console = None + if not isinstance(agent_id, AgentId): + raise TypeError("parameter must be an instance of class AgentId") + if not isinstance(console, Console): + raise TypeError("parameter must be an instance of class Console") + + self._id = agent_id + self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id) + self._console = console self._sender = None self._packages = [] # list of package names known to this agent self._classes = {} # dict [key:class] of classes known to this agent self._subscriptions = [] # list of active standing subscriptions for this agent - self._exists = False # true when Agent Announce is received from this agent - logging.debug( "Created AgentProxy with address: [%s]" % self._address ) + self._announce_timestamp = long(0) # timestamp when last announce received + logging.debug( "Created Agent with address: [%s]" % self._address ) - def key(self): - return str(self._name) + def getAgentId(self): + return self._id + def isActive(self): + return self._announce_timestamp != 0 def _send_msg(self, msg): """ @@ -250,26 +267,11 @@ class AgentProxy(object): msg.reply_to = self._console.address() handle = self._console._req_correlation.allocate() if handle == 0: - raise Exception("Can not allocate a sequence id!") + raise Exception("Can not allocate a correlation id!") msg.correlation_id = str(handle) self._sender.send(msg) return handle - - - def _fetch_reply_msg(self, handle, timeout=None): - """ - Low-level routine to wait for an expected reply from the agent. - """ - if handle == 0: - raise Exception("Invalid handle") - msg = self._console._req_correlation.get_data( handle, timeout ) - if not msg: - logging.debug("timed out waiting for reply message") - self._console._req_correlation.release( handle ) - return msg - - def get_packages(self): """ Return a list of the names of all packages known to this agent. @@ -371,22 +373,6 @@ class WorkItem(object): -class Notifier(object): - """ - Virtual base class that defines a call back which alerts the application that - a QMF Console notification is pending. - """ - def console_indication(self): - """ - Called when one or more console items are ready for the console application to process. - This method may be called by the internal console management thread. Its purpose is to - indicate that the console application should process pending items. - """ - pass - - - - class Console(Thread): """ A Console manages communications to a collection of agents on behalf of an application. @@ -408,16 +394,17 @@ class Console(Thread): self._notifier = notifier self._conn = None self._session = None - # dict of "agent-direct-address":AgentProxy entries + self._lock = Lock() + # dict of "agent-direct-address":class Agent entries self._agent_map = {} - self._agent_map_lock = Lock() self._direct_recvr = None self._announce_recvr = None self._locate_sender = None self._schema_cache = {} self._req_correlation = SequencedWaiter() self._operational = False - self._agent_discovery = False + self._agent_discovery_predicate = None + self._default_timeout = 60 # lock out run() thread self._cv = Condition() # for passing WorkItems to the application @@ -514,78 +501,96 @@ class Console(Thread): return self._address - def create_agent( self, agent_name ): + def _createAgent( self, agent_id ): """ Factory to create/retrieve an agent for this console """ - if not isinstance(agent_name, AgentId): - raise TypeError("agent_name must be an instance of AgentId") + if not isinstance(agent_id, AgentId): + raise TypeError("parameter must be an instance of class AgentId") - agent = AgentProxy(agent_name) - - self._agent_map_lock.acquire() + self._lock.acquire() try: - if agent_name in self._agent_map: - return self._agent_map[agent_name] + if agent_id in self._agent_map: + return self._agent_map[agent_id] - agent._console = self + agent = Agent(agent_id, self) agent._sender = self._session.sender(agent._address) - self._agent_map[agent_name] = agent + self._agent_map[agent_id] = agent finally: - self._agent_map_lock.release() + self._lock.release() return agent - def destroy_agent( self, agent ): + def destroyAgent( self, agent ): """ Undoes create. """ - if not isinstance(agent, AgentProxy): - raise TypeError("agent must be an instance of AgentProxy") + if not isinstance(agent, Agent): + raise TypeError("agent must be an instance of class Agent") - self._agent_map_lock.acquire() + self._lock.acquire() try: - if agent._name in self._agent_map: - del self._agent_map[agent._name] + if agent._id in self._agent_map: + del self._agent_map[agent._id] finally: - self._agent_map_lock.release() + self._lock.release() - def find_agent(self, agent_name, timeout=None ): + def findAgent(self, agent_id, timeout=None ): """ - Given the name of a particular agent, return an AgentProxy representing - that agent. Return None if the agent does not exist. + Given the id of a particular agent, return an instance of class Agent + representing that agent. Return None if the agent does not exist. """ - self._agent_map_lock.acquire() + if not isinstance(agent_id, AgentId): + raise TypeError("parameter must be an instance of class AgentId") + + self._lock.acquire() try: - if agent_name in self._agent_map: - return self._agent_map[agent_name] + if agent_id in self._agent_map: + return self._agent_map[agent_id] finally: - self._agent_map_lock.release() - - new_agent = self.create_agent(agent_name) - msg = Message(subject=makeSubject(OpCode.agent_locate), - properties={"method":"request"}, - content={"query": {"vendor" : agent_name.vendor(), - "product" : agent_name.product(), - "name" : agent_name.name()}}) - handle = new_agent._send_msg(msg) - if handle == 0: - raise Exception("Failed to send Agent locate message to agent %s" % str(agent_name)) + self._lock.release() + + # agent not present yet - ping it with an agent_locate - msg = new_agent._fetch_reply_msg(handle, timeout) - if not msg: - logging.debug("Unable to contact agent '%s' - no reply." % agent_name) - self.destroy_agent(new_agent) + handle = self._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + try: + tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)) + msg = Message(subject=makeSubject(OpCode.agent_locate), + properties={"method":"request"}, + content={"query": {Query._TARGET: {Query._TARGET_AGENT_ID:None}, + Query._PREDICATE: + [Query._LOGIC_AND, + [Query._CMP_EQ, "vendor", agent_id.vendor()], + [Query._CMP_EQ, "product", agent_id.product()], + [Query._CMP_EQ, "name", agent_id.name()]]}}) + msg.reply_to = self._address + msg.correlation_id = str(handle) + tmp_sender.send( msg ) + except SendError, e: + logging.error(str(e)) + self._req_correlation.release(handle) return None - # @todo - for now, dump the message - logging.info( "agent-locate reply received for %s" % agent_name) - return new_agent + if not timeout: + timeout = self._default_timeout + + new_agent = None + self._req_correlation.get_data( handle, timeout ) + self._req_correlation.release(handle) + self._lock.acquire() + try: + if agent_id in self._agent_map: + new_agent = self._agent_map[agent_id] + finally: + self._lock.release() + return new_agent def run(self): @@ -600,14 +605,14 @@ class Console(Thread): try: msg = self._announce_recvr.fetch(timeout = 0) if msg: - self._rcv_announce(msg) + self._dispatch(msg, _direct=False) except: pass try: msg = self._direct_recvr.fetch(timeout = 0) if msg: - self._rcv_direct(msg) + self._dispatch(msg, _direct=True) except: pass @@ -626,7 +631,7 @@ class Console(Thread): _callback_thread = currentThread() logging.info("Calling console indication") - self._notifier.console_indication() + self._notifier.indication() _callback_thread = None while self._operational and \ @@ -640,79 +645,120 @@ class Console(Thread): # called by run() thread ONLY # - def _rcv_announce(self, msg): + def _dispatch(self, msg, _direct=True): """ - PRIVATE: Process a message received on the announce receiver + PRIVATE: Process a message received from an Agent """ - logging.info( "Announce message received!" ) + logging.error( "Message received from Agent! [%s]" % msg ) try: version,opcode = parseSubject(msg.subject) + # @todo: deal with version mismatch!!! except: - logging.debug("Ignoring unrecognized broadcast message '%s'" % msg.subject) + logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject) return - amap = {}; props = {} + cmap = {}; props = {} if msg.content_type == "amqp/map": - amap = msg.content + cmap = msg.content if msg.properties: props = msg.properties if opcode == OpCode.agent_ind: - # agent indication - if "name" in amap: - if self._agent_discovery: - ind = amap["name"] - if "vendor" in ind and "product" in ind and "name" in ind: - - agent = self.create_agent(AgentId( ind["vendor"], - ind["product"], - ind["name"] )) - if not agent._exists: - # new agent - agent._exists = True - logging.info("AGENT_ADDED for %s" % agent) - wi = WorkItem(WorkItem.AGENT_ADDED, - {"agent": agent}) - self._work_q.put(wi) + self._handleAgentIndMsg( msg, cmap, version, _direct ) + elif opcode == OpCode.data_ind: + logging.warning("!!! data_ind TBD !!!") + elif opcode == OpCode.event_ind: + logging.warning("!!! event_ind TBD !!!") + elif opcode == OpCode.managed_object: + logging.warning("!!! managed_object TBD !!!") + elif opcode == OpCode.object_ind: + logging.warning("!!! object_ind TBD !!!") + elif opcode == OpCode.response: + logging.warning("!!! response TBD !!!") + elif opcode == OpCode.schema_ind: + logging.warning("!!! schema_ind TBD !!!") + elif opcode == OpCode.noop: + logging.debug("No-op msg received.") else: logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) - - # called by run() thread ONLY - # - def _rcv_direct(self, msg): + def _handleAgentIndMsg(self, msg, cmap, version, direct): """ - PRIVATE: Process a message sent to my direct receiver + Process a received agent-ind message. This message may be a response to a + agent-locate, or it can be an unsolicited agent announce. """ - logging.info( "direct message received!" ) + logging.debug("_handleAgentIndMsg '%s'" % msg) + + if Query._TARGET_AGENT_ID in cmap: + try: + agent_id = AgentIdFactory(cmap[Query._TARGET_AGENT_ID]) + except: + logging.debug("Bad agent-ind message received: '%s'" % msg) + return + + ignore = True + matched = False + correlated = False if msg.correlation_id: - self._req_correlation.put_data(msg.correlation_id, msg) + correlated = self._req_correlation.isValid(msg.correlation_id) + + if direct and correlated: + ignore = False + elif self._agent_discovery_predicate: + matched = _doQuery( self._agent_discovery_predicate, + agent_id.mapEncode() ) + ignore = not matched + + if not ignore: + agent = None + self._lock.acquire() + try: + if agent_id in self._agent_map: + agent = self._agent_map[agent_id] + finally: + self._lock.release() + if not agent: + # need to create and add a new agent + agent = self._createAgent(agent_id) + old_timestamp = agent._announce_timestamp + agent._announce_timestamp = time.time() - def enable_agent_discovery(self): + if old_timestamp == 0 and matched: + logging.debug("AGENT_ADDED for %s" % agent) + wi = WorkItem(WorkItem.AGENT_ADDED, + {"agent": agent}) + self._work_q.put(wi) + + if correlated: + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + + def enableAgentDiscovery(self, query=None): """ Called to enable the asynchronous Agent Discovery process. Once enabled, AGENT_ADD work items can arrive on the WorkQueue. """ - if not self._agent_discovery: - self._agent_discovery = True - msg = Message(subject=makeSubject(OpCode.agent_locate), - properties={"method":"request"}, - content={"query": {"vendor": "*", - "product": "*", - "name": "*"}}) - self._locate_sender.send(msg) - + self._agent_discovery_predicate = [Query._CMP_TRUE] # default: match all indications + if query: + if not isinstance(query, dict): + raise TypeError("parameter must be of type dict") + if Query._TARGET not in query or query[Query._TARGET] != {Query._TARGET_AGENT_ID:None}: + raise TypeError("query must be for an agent '%s'" % query) + if Query._PREDICATE in query: + self._agent_discovery_predicate = query[Query._PREDICATE][:] - def disable_agent_discovery(self): + def disableAgentDiscovery(self): """ Called to disable the async Agent Discovery process enabled by - calling enable_agent_discovery() + calling enableAgentDiscovery() """ - self._agent_discovery = False + self._agent_discovery_predicate = None @@ -731,7 +777,7 @@ class Console(Thread): """ try: wi = self._work_q.get(True, timeout) - except Queue.Empty: + except: return None return wi @@ -1125,8 +1171,7 @@ class Console(Thread): if __name__ == '__main__': # temp test code - import time - from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory, + from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory, SchemaObjectClassFactory, ObjectIdFactory, QmfData, QmfDescribed, QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, QmfEvent) @@ -1142,7 +1187,7 @@ if __name__ == '__main__': _myConsole.add_connection( _c ) logging.info( "Finding Agent" ) - _myAgent = _myConsole.find_agent( AgentId( "redhat.com", "agent", "tross" ), 5 ) + _myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" ), 5 ) logging.info( "Agent Found: %s" % _myAgent ) @@ -1159,7 +1204,7 @@ if __name__ == '__main__': self._myContext = context self.WorkAvailable = False - def console_indication(self): + def indication(self): print("Indication received! context=%d" % self._myContext) self.WorkAvailable = True @@ -1168,7 +1213,7 @@ if __name__ == '__main__': _myConsole = Console(notifier=_noteMe) _myConsole.add_connection( _c ) - _myConsole.enable_agent_discovery() + _myConsole.enableAgentDiscovery() logging.info("Waiting...") diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py index ea7366945a..cd1644bca2 100644 --- a/qpid/python/qmf/test/agent_test.py +++ b/qpid/python/qmf/test/agent_test.py @@ -1,58 +1,100 @@ import logging import time +from threading import Semaphore + from qpid.messaging import * from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty, SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed, QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, - QmfEvent, SchemaMethod) + QmfEvent, SchemaMethod, Notifier) from qmfAgent import (Agent, QmfAgentData) -class MyAgent(object): - def main(self): - self._agent = Agent( "redhat.com", "qmf", "testAgent" ) +class ExampleNotifier(Notifier): + def __init__(self): + self._sema4 = Semaphore(0) # locked + + def indication(self): + self._sema4.release() + + def waitForWork(self): + logging.error("Waiting for event...") + self._sema4.acquire() + logging.error("...event present") + + + +# +# An example agent application +# + +_notifier = ExampleNotifier() +_agent = Agent( "redhat.com", "qmf", "testAgent", _notifier ) - # Dynamically construct a class schema +# Dynamically construct a class schema + +_schema = SchemaObjectClass( "MyPackage", "MyClass", + desc="A test data schema", + _pkey=["index1", "index2"] ) +# add properties +_schema.addProperty( "index1", + SchemaProperty(qmfTypes.TYPE_UINT8)) +_schema.addProperty( "index2", + SchemaProperty(qmfTypes.TYPE_LSTR)) + +# add method +_meth = SchemaMethod( _desc="A test method" ) +_meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) ) +_meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) ) +_meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) ) + +_schema.addMethod( "meth_1", _meth ) + +# Add schema to Agent - _schema = SchemaObjectClass( "MyPackage", "MyClass", - desc="A test data schema", - _pkey=["index1", "index2"] ) - # add properties - _schema.addProperty( "index1", - SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.addProperty( "index2", - SchemaProperty(qmfTypes.TYPE_LSTR)) +_agent.registerObjectClass(_schema) - # add method - _meth = SchemaMethod( _desc="A test method" ) - _meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) ) - _meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) ) +# instantiate managed data objects matching the schema - _schema.addMethod( "meth_3", _meth ) +_obj = QmfAgentData( _agent, _schema ) +_obj.setProperty("index1", 100) +_obj.setProperty("index2", "a name" ) - # Add schema to Agent +_agent.addObject( _obj ) +_agent.addObject( QmfAgentData( _agent, _schema, + _props={"index1":99, + "index2": "another name"} )) - self._agent.registerObjectClass(_schema) +## Now connect to the broker - # instantiate managed data objects matching the schema +_c = Connection("localhost") +_c.connect() +_agent.setConnection(_c) - obj = QmfAgentData( self._agent, _schema ) - obj.setProperty("index1", 100) - obj.setProperty("index2", "a name" ) +_done = False +while not _done: + try: + _notifier.waitForWork() - self._agent.addObject( QmfAgentData( self._agent, _schema, - _props={"index1":99, - "index2": "another name"} )) + _wi = _agent.getNextWorkItem(timeout=0) + while _wi: + print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) + _agent.releaseWorkItem(_wi) + _wi = _agent.getNextWorkitem(timeout=0) + except: + logging.info( "shutting down..." ) + _done = True +logging.info( "Removing connection... TBD!!!" ) +#_myConsole.remove_connection( _c, 10 ) - return None +logging.info( "Destroying agent... TBD!!!" ) +#_myConsole.destroy( 10 ) +logging.info( "******** agent test done ********" ) -app = MyAgent() -print( "s='%s'", str(app.main())) diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py new file mode 100644 index 0000000000..03b4a98808 --- /dev/null +++ b/qpid/python/qmf/test/console_test.py @@ -0,0 +1,67 @@ +import logging +import time +from threading import Semaphore + + +from qpid.messaging import * +from qmfCommon import (Notifier, Query) +from qmfConsole import Console + + +class ExampleNotifier(Notifier): + def __init__(self): + self._sema4 = Semaphore(0) # locked + + def indication(self): + self._sema4.release() + + def waitForWork(self): + logging.error("Waiting for event...") + self._sema4.acquire() + logging.error("...event present") + + +logging.getLogger().setLevel(logging.INFO) + +logging.info( "Starting Connection" ) +_c = Connection("localhost") +_c.connect() + +logging.info( "Starting Console" ) + +_notifier = ExampleNotifier() +_myConsole = Console(notifier=_notifier) +_myConsole.add_connection( _c ) + +# Discover only agents from vendor "redhat.com" that +# are a "qmf" product.... +# @todo: replace "manual" query construction with +# a formal class-based Query API +_query = {Query._TARGET: {Query._TARGET_AGENT_ID:None}, + Query._PREDICATE: + [Query._LOGIC_AND, + [Query._CMP_EQ, "vendor", "redhat.com"], + [Query._CMP_EQ, "product", "qmf"]]} + +_myConsole.enableAgentDiscovery(_query) + +_done = False +while not _done: + try: + _notifier.waitForWork() + + _wi = _myConsole.get_next_workitem(timeout=0) + while _wi: + print("!!! work item received %d:%s" % (_wi.getType(), str(_wi.getParams()))) + _wi = _myConsole.get_next_workitem(timeout=0) + except: + logging.info( "shutting down..." ) + _done = True + +logging.info( "Removing connection" ) +_myConsole.remove_connection( _c, 10 ) + +logging.info( "Destroying console:" ) +_myConsole.destroy( 10 ) + +logging.info( "******** console test done ********" ) |