summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-12-16 22:24:57 +0000
committerTed Ross <tross@apache.org>2009-12-16 22:24:57 +0000
commit9fc81d4e597ae271632d55c87fc6a61ed3f62a49 (patch)
tree5232f7fb4302794b5b659776676f1d70c3af8139
parentb9ed41f57178248064be233ea42887e2e9eed497 (diff)
downloadqpid-python-9fc81d4e597ae271632d55c87fc6a61ed3f62a49.tar.gz
QPID-2261 - Applied patch from Ken Giusti
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@891456 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qmf/qmfAgent.py223
-rw-r--r--qpid/python/qmf/qmfCommon.py154
-rw-r--r--qpid/python/qmf/qmfConsole.py337
-rw-r--r--qpid/python/qmf/test/agent_test.py104
-rw-r--r--qpid/python/qmf/test/console_test.py67
5 files changed, 624 insertions, 261 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py
index 427d3e549d..f211cdb383 100644
--- a/qpid/python/qmf/qmfAgent.py
+++ b/qpid/python/qmf/qmfAgent.py
@@ -21,12 +21,12 @@ import sys
import socket
import os
import logging
-from threading import Thread
+from threading import Thread, Lock
from qpid.messaging import Connection, Message
from uuid import uuid4
from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE,
AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject,
- parseSubject, OpCode)
+ parseSubject, OpCode, Query, SchemaObjectClass, _doQuery)
@@ -49,6 +49,10 @@ class Agent(Thread):
self._address = str(self._id)
self._notifier = notifier
self._conn = None
+ self._lock = Lock()
+ self._data_schema = {}
+ self._event_schema = {}
+ self._agent_data = {}
def getAgentId(self):
return AgentId(self.vendor, self.product, self.name)
@@ -61,52 +65,66 @@ class Agent(Thread):
self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION)
self._running = True
self.start()
+
+ def registerObjectClass(self, schema):
+ """
+ Register an instance of a SchemaObjectClass with this agent
+ """
+ # @todo: need to update subscriptions
+ # @todo: need to mark schema as "non-const"
+ if not isinstance(schema, SchemaObjectClass):
+ raise TypeError("SchemaObjectClass instance expected")
+ self._lock.acquire()
+ try:
+ self._data_schema[schema.getClassId()] = schema
+ finally:
+ self._lock.release()
- def _dispatch(self, msg, _direct=False):
+
+ def registerEventClass(self, cls):
+ logging.error("!!!Agent.registerEventClass() TBD!!!")
+
+ def raiseEvent(self, qmfEvent):
+ logging.error("!!!Agent.raiseEvent() TBD!!!")
+
+ def addObject(self, data ):
"""
- @param _direct: True if msg directly addressed to this agent.
+ Register an instance of a QmfAgentData object.
"""
+ # @todo: need to update subscriptions
+ # @todo: need to mark schema as "non-const"
+ if not isinstance(data, QmfAgentData):
+ raise TypeError("QmfAgentData instance expected")
+
+ self._lock.acquire()
try:
- version,opcode = parseSubject(msg.subject)
- except:
- logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
- return
+ self._agent_data[data.getObjectId()] = data
+ finally:
+ self._lock.release()
- cmap = {}; props={}
- if msg.content_type == "amqp/map":
- cmap = msg.content
- if msg.properties:
- props = msg.properties
- if opcode == OpCode.agent_locate:
- reply = False
- if "method" in props and props["method"] == "request":
- if "query" in cmap:
- if self._doQuery(cmap["query"]):
- reply=True
- else:
- reply=True
-
- if reply:
- try:
- tmp_snd = self._session.sender( msg.reply_to )
- m = Message( subject=makeSubject(OpCode.agent_locate),
- properties={"method":"response"},
- content={"name": {"vendor":"redhat.com",
- "product":"agent",
- "name":"tross"}},
- correlation_id=msg.correlation_id)
- tmp_snd.send(m)
- logging.debug("reply-to [%s] sent" % msg.reply_to)
- except e:
- logging.error("Failed to send reply to msg '%s'" % str(e))
- else:
- logging.debug("Ignoring invalid agent-locate msg")
- else:
- logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
- % opcode)
+ def methodResponse(self, context, status, text, arguments):
+ logging.error("!!!Agent.methodResponse() TBD!!!")
+ def getWorkItemCount(self):
+ """
+ Returns the count of pending WorkItems that can be retrieved.
+ """
+ logging.error("!!!Agent.getWorkItemCount() TBD!!!")
+
+ def getNextWorkItem(self, timeout=None):
+ """
+ Obtains the next pending work item, or None if none available.
+ """
+ logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+
+ def releaseWorkItem(self, wi):
+ """
+ Releases a WorkItem instance obtained by getNextWorkItem(). Called when
+ the application has finished processing the WorkItem.
+ """
+ logging.error("!!!Agent.releaseWorkItem() TBD!!!")
def run(self):
@@ -114,7 +132,7 @@ class Agent(Thread):
while self._running:
try:
msg = self._locate_receiver.fetch(1)
- logging.info("Agent Locate Rcvd: '%s'" % msg)
+ logging.debug("Agent Locate Rcvd: '%s'" % msg)
if msg.content_type == "amqp/map":
self._dispatch(msg, _direct=False)
except KeyboardInterrupt:
@@ -124,7 +142,7 @@ class Agent(Thread):
try:
msg = self._direct_receiver.fetch(1)
- logging.info("Agent Msg Rcvd: '%s'" % msg)
+ logging.debug("Agent Msg Rcvd: '%s'" % msg)
if msg.content_type == "amqp/map":
self._dispatch(msg, _direct=True)
except KeyboardInterrupt:
@@ -132,61 +150,100 @@ class Agent(Thread):
except:
pass
+ # @todo: actually implement the periodic agent-ind
+ # message generation!
count+= 1
if count == 5:
count = 0
- m = Message( subject=makeSubject(OpCode.agent_ind),
- properties={"method":"indication"},
- content={"name": {"vendor":"redhat.com",
- "product":"agent",
- "name":"tross"}} )
- self._ind_sender.send(m)
- logging.info("Agent Indication Sent")
-
-
- def registerObjectClass(self, cls):
- logging.error("!!!Agent.registerObjectClass() TBD!!!")
+ self._ind_sender.send(self._makeAgentIndMsg())
+ logging.debug("Agent Indication Sent")
- def registerEventClass(self, cls):
- logging.error("!!!Agent.registerEventClass() TBD!!!")
+ #
+ # Private:
+ #
- def raiseEvent(self, qmfEvent):
- logging.error("!!!Agent.raiseEvent() TBD!!!")
+ def _makeAgentIndMsg(self):
+ """
+ Create an agent indication message identifying this agent
+ """
+ return Message( subject=makeSubject(OpCode.agent_ind),
+ properties={"method":"response"},
+ content={Query._TARGET_AGENT_ID:
+ self.getAgentId().mapEncode()})
- def addObject(self, qmfAgentData ):
- logging.error("!!!Agent.addObject() TBD!!!")
- def methodResponse(self, context, status, text, arguments):
- logging.error("!!!Agent.methodResponse() TBD!!!")
- def getWorkItemCount(self):
- """
- Returns the count of pending WorkItems that can be retrieved.
+ def _dispatch(self, msg, _direct=False):
"""
- logging.error("!!!Agent.getWorkItemCount() TBD!!!")
+ Process a message from a console.
- def getNextWorkItem(self, timeout=None):
- """
- Obtains the next pending work item, or None if none available.
+ @param _direct: True if msg directly addressed to this agent.
"""
- logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+ logging.error( "Message received from Console! [%s]" % msg )
+ try:
+ version,opcode = parseSubject(msg.subject)
+ except:
+ logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
+ return
- def releaseWorkItem(self, wi):
+ cmap = {}; props={}
+ if msg.content_type == "amqp/map":
+ cmap = msg.content
+ if msg.properties:
+ props = msg.properties
+
+ if opcode == OpCode.agent_locate:
+ self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
+ elif opcode == OpCode.get_query:
+ logging.warning("!!! GET_QUERY TBD !!!")
+ elif opcode == OpCode.method_req:
+ logging.warning("!!! METHOD_REQ TBD !!!")
+ elif opcode == OpCode.cancel_subscription:
+ logging.warning("!!! CANCEL_SUB TBD !!!")
+ elif opcode == OpCode.create_subscription:
+ logging.warning("!!! CREATE_SUB TBD !!!")
+ elif opcode == OpCode.renew_subscription:
+ logging.warning("!!! RENEW_SUB TBD !!!")
+ elif opcode == OpCode.schema_query:
+ logging.warning("!!! SCHEMA_QUERY TBD !!!")
+ elif opcode == OpCode.noop:
+ logging.debug("No-op msg received.")
+ else:
+ logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
+ % opcode)
+
+ def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ):
"""
- Releases a WorkItem instance obtained by getNextWorkItem(). Called when
- the application has finished processing the WorkItem.
+ Process a received agent-locate message
"""
- logging.error("!!!Agent.releaseWorkItem() TBD!!!")
+ logging.debug("_handleAgentLocateMsg")
+
+ reply = True
+ if "method" in props and props["method"] == "request":
+ if "query" in cmap:
+ query = cmap["query"]
+ # is the query an agent locate?
+ if Query._TARGET in query and query[Query._TARGET] == {Query._TARGET_AGENT_ID:None}:
+ if Query._PREDICATE in query:
+ # does this agent match the predicate?
+ reply = _doQuery( query[Query._PREDICATE], self.getAgentId().mapEncode() )
+ else:
+ reply = False
+ logging.debug("Ignoring query - not an agent-id query: '%s'" % query)
+ reply=True
+
+ if reply:
+ try:
+ tmp_snd = self._session.sender( msg.reply_to )
+ m = self._makeAgentIndMsg()
+ m.correlation_id = msg.correlation_id
+ tmp_snd.send(m)
+ logging.debug("agent-ind sent to [%s]" % msg.reply_to)
+ except:
+ logging.error("Failed to send reply to agent-ind msg '%s'" % msg)
+ else:
+ logging.debug("agent-locate msg not mine - no reply sent")
- def _doQuery(self, query):
- # query = cmap["query"]
- # if ("vendor" in query and (query["vendor"] == "*" or query["vendor"] == self.vendor) and
- # "product" in query and (query["product"] == "*" or query["product"] == self.product) and
- # "name" in query and (query["name"] == "*" or query["name"] == self.name)):
- # logging.debug("Query received for %s:%s:%s" % (self.vendor, self.product, self.name))
- # logging.debug("reply-to [%s], cid=%s" % (msg.reply_to, msg.correlation_id))
- logging.error("!!!Agent._doQuery() TBD!!!")
- return True
##==============================================================================
@@ -228,7 +285,7 @@ class QmfAgentData(QmfManaged):
if __name__ == '__main__':
import time
- #logging.getLogger().setLevel(logging.INFO)
+ logging.getLogger().setLevel(logging.INFO)
logging.info( "Starting Connection" )
_c = Connection("localhost")
_c.connect()
diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py
index cfdc87806b..daa5cea2fd 100644
--- a/qpid/python/qmf/qmfCommon.py
+++ b/qpid/python/qmf/qmfCommon.py
@@ -48,9 +48,28 @@ AMQP_QMF_VERSION = 4
AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
class OpCode(object):
+ noop = "noop"
+
+ # codes sent by a console and processed by the agent
agent_locate = "agent-locate"
+ cancel_subscription = "cancel-subscription"
+ create_subscription = "create-subscription"
+ get_query = "get_query"
+ method_req = "method"
+ renew_subscription = "renew-subscription"
+ schema_query = "schema-query" # @todo: deprecate
+
+ # codes sent by the agent to a console
agent_ind = "agent"
- noop = "noop"
+ data_ind = "data"
+ event_ind = "event"
+ managed_object = "managed-object"
+ object_ind = "object"
+ response = "response"
+ schema_ind="schema" # @todo: deprecate
+
+
+
def makeSubject(_code):
"""
@@ -69,6 +88,19 @@ def parseSubject(_sub):
return _sub[3:].split('.', 1)
+class Notifier(object):
+ """
+ Virtual base class that defines a call back which alerts the application that
+ a QMF Console notification is pending.
+ """
+ def indication(self):
+ """
+ Called when one or more items are ready for the application to process.
+ This method may be called by an internal QMF library thread. Its purpose is to
+ indicate that the application should process pending work items.
+ """
+ raise Exception("The indication method must be overridden by the application!")
+
##==============================================================================
@@ -966,7 +998,127 @@ class MethodResponse(object):
# ##==============================================================================
+
+def _doQuery(predicate, params ):
+ """
+ Given the predicate from a query, and a map of named parameters, apply the predicate
+ to the parameters, and return True or False.
+ """
+ if type(predicate) != list or len(predicate) < 1:
+ return False
+
+ opr = predicate[0]
+ if opr == Query._CMP_TRUE:
+ logging.info("_doQuery() TRUE: [%s]" % predicate )
+ return True
+ elif opr == Query._CMP_FALSE:
+ logging.info("_doQuery() FALSE: [%s]" % predicate )
+ return False
+ elif opr == Query._LOGIC_AND:
+ logging.info("_doQuery() AND: [%s]" % predicate )
+ rc = False
+ for exp in predicate[1:]:
+ rc = _doQuery( exp, params )
+ if not rc:
+ break
+ return rc
+
+ elif opr == Query._LOGIC_OR:
+ logging.info("_doQuery() OR: [%s]" % predicate )
+ rc = False
+ for exp in predicate[1:]:
+ rc = _doQuery( exp, params )
+ if rc:
+ break
+ return rc
+
+ elif opr == Query._LOGIC_NOT:
+ logging.info("_doQuery() NOT: [%s]" % predicate )
+ if len(predicate) != 2:
+ logging.warning("Malformed query not-expression received: '%s'" % predicate)
+ return False
+ return not _doQuery( predicate[1:], params )
+
+ elif opr in [Query._CMP_EQ, Query._CMP_NE, Query._CMP_LT,
+ Query._CMP_LE, Query._CMP_GT, Query._CMP_GE,
+ Query._CMP_RE]:
+ if len(predicate) != 3:
+ logging.warning("Malformed query compare expression received: '%s'" % predicate)
+ return False
+ # @todo: support regular expression match
+ name = predicate[1]
+ if name not in params:
+ logging.warning("Malformed query, attribute '%s' not present." % name)
+ return False
+ arg1 = params[name]
+ arg1Type = type(arg1)
+ logging.info("_doQuery() CMP: [%s] value='%s'" % (predicate, arg1) )
+ try:
+ arg2 = arg1Type(predicate[2])
+ if opr == Query._CMP_EQ: return arg1 == arg2
+ if opr == Query._CMP_NE: return arg1 != arg2
+ if opr == Query._CMP_LT: return arg1 < arg2
+ if opr == Query._CMP_LE: return arg1 <= arg2
+ if opr == Query._CMP_GT: return arg1 > arg2
+ if opr == Query._CMP_GE: return arg1 >= arg2
+ if opr == Query._CMP_RE:
+ logging.error("!!! RE QUERY TBD !!!")
+ return False
+ except:
+ logging.warning("Malformed query, unable to compare '%s'" % predicate)
+ return False
+
+ elif opr == Query._CMP_PRESENT:
+ logging.info("_doQuery() PRESENT: [%s]" % predicate )
+ if len(predicate) != 2:
+ logging.warning("Malformed query present expression received: '%s'" % predicate)
+ return False
+ name = predicate[1]
+ return name in params
+
+ else:
+ logging.warning("Unknown query operator received: '%s'" % opr)
+ return False
+
+
+
class Query:
+ _TARGET="what"
+ _PREDICATE="where"
+
+ _TARGET_PACKAGES="_packages"
+ _TARGET_OBJECT_ID="_object_id"
+ _TARGET_SCHEMA="_schema"
+ _TARGET_SCHEMA_ID="_schema_id"
+ _TARGET_MGT_DATA="_mgt_data"
+ _TARGET_AGENT_ID="_agent_id"
+
+ _PRED_PACKAGE="_package_name"
+ _PRED_CLASS="_class_name"
+ _PRED_TYPE="_type"
+ _PRED_HASH="_has_str"
+ _PRED_SCHEMA_ID="_schema_id"
+ _PRED_VENDOR="_vendor"
+ _PRED_PRODUCT="_product"
+ _PRED_NAME="_name"
+ _PRED_AGENT_ID="_agent_id"
+ _PRED_PRIMARY_KEY="_primary_key"
+
+ _CMP_EQ="eq"
+ _CMP_NE="ne"
+ _CMP_LT="lt"
+ _CMP_LE="le"
+ _CMP_GT="gt"
+ _CMP_GE="ge"
+ _CMP_RE="re_match"
+ _CMP_PRESENT="exists"
+ _CMP_TRUE="true"
+ _CMP_FALSE="false"
+
+ _LOGIC_AND="and"
+ _LOGIC_OR="or"
+ _LOGIC_NOT="not"
+
def __init__(self, kwargs={}):
pass
# if "impl" in kwargs:
diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py
index 24f263adab..e956b07dab 100644
--- a/qpid/python/qmf/qmfConsole.py
+++ b/qpid/python/qmf/qmfConsole.py
@@ -30,7 +30,8 @@ from threading import Condition
from qpid.messaging import *
from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION,
- AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode)
+ AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode,
+ Query, AgentIdFactory, Notifier, _doQuery)
@@ -67,9 +68,8 @@ class _Mailbox(object):
def fetch(self, timeout=None):
self._cv.acquire()
try:
- if len(self._msgs):
- return self._msgs.pop()
- self._cv.wait(timeout)
+ if len(self._msgs) == 0:
+ self._cv.wait(timeout)
if len(self._msgs):
return self._msgs.pop()
return None
@@ -169,6 +169,19 @@ class SequencedWaiter(object):
self.lock.release()
+ def isValid(self, seq):
+ """
+ True if seq is in use, else False (seq is unknown)
+ """
+ seq = long(seq)
+ self.lock.acquire()
+ try:
+ return seq in self.pending
+ finally:
+ self.lock.release()
+ return False
+
+
#class ObjectProxy(QmfObject):
class ObjectProxy(object):
@@ -198,11 +211,11 @@ class ObjectProxy(object):
"""
if not self._agent:
raise Exception("No Agent associated with this object")
- newer = self._agent.get_object(Query({"object_id":object_id}), timeout)
+ newer = self._agent.get_object(Query({"object_id":None}), timeout)
if newer == None:
logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent)))
- self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class
+ #self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class
### def _merge_update(self, newerObject):
### ??? in Rafi's console.py::Object Class
@@ -216,32 +229,36 @@ class ObjectProxy(object):
-class AgentProxy(object):
+class Agent(object):
"""
A local representation of a remote agent managed by this console.
"""
- def __init__(self, name):
+ def __init__(self, agent_id, console):
"""
@type name: AgentId
@param name: uniquely identifies this agent in the AMQP domain.
"""
- if not name or not isinstance(name, AgentId):
- raise Exception( "Attempt to create an Agent without supplying a valid agent name." );
-
- self._name = name
- self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(name)
- self._console = None
+ if not isinstance(agent_id, AgentId):
+ raise TypeError("parameter must be an instance of class AgentId")
+ if not isinstance(console, Console):
+ raise TypeError("parameter must be an instance of class Console")
+
+ self._id = agent_id
+ self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)
+ self._console = console
self._sender = None
self._packages = [] # list of package names known to this agent
self._classes = {} # dict [key:class] of classes known to this agent
self._subscriptions = [] # list of active standing subscriptions for this agent
- self._exists = False # true when Agent Announce is received from this agent
- logging.debug( "Created AgentProxy with address: [%s]" % self._address )
+ self._announce_timestamp = long(0) # timestamp when last announce received
+ logging.debug( "Created Agent with address: [%s]" % self._address )
- def key(self):
- return str(self._name)
+ def getAgentId(self):
+ return self._id
+ def isActive(self):
+ return self._announce_timestamp != 0
def _send_msg(self, msg):
"""
@@ -250,26 +267,11 @@ class AgentProxy(object):
msg.reply_to = self._console.address()
handle = self._console._req_correlation.allocate()
if handle == 0:
- raise Exception("Can not allocate a sequence id!")
+ raise Exception("Can not allocate a correlation id!")
msg.correlation_id = str(handle)
self._sender.send(msg)
return handle
-
-
- def _fetch_reply_msg(self, handle, timeout=None):
- """
- Low-level routine to wait for an expected reply from the agent.
- """
- if handle == 0:
- raise Exception("Invalid handle")
- msg = self._console._req_correlation.get_data( handle, timeout )
- if not msg:
- logging.debug("timed out waiting for reply message")
- self._console._req_correlation.release( handle )
- return msg
-
-
def get_packages(self):
"""
Return a list of the names of all packages known to this agent.
@@ -371,22 +373,6 @@ class WorkItem(object):
-class Notifier(object):
- """
- Virtual base class that defines a call back which alerts the application that
- a QMF Console notification is pending.
- """
- def console_indication(self):
- """
- Called when one or more console items are ready for the console application to process.
- This method may be called by the internal console management thread. Its purpose is to
- indicate that the console application should process pending items.
- """
- pass
-
-
-
-
class Console(Thread):
"""
A Console manages communications to a collection of agents on behalf of an application.
@@ -408,16 +394,17 @@ class Console(Thread):
self._notifier = notifier
self._conn = None
self._session = None
- # dict of "agent-direct-address":AgentProxy entries
+ self._lock = Lock()
+ # dict of "agent-direct-address":class Agent entries
self._agent_map = {}
- self._agent_map_lock = Lock()
self._direct_recvr = None
self._announce_recvr = None
self._locate_sender = None
self._schema_cache = {}
self._req_correlation = SequencedWaiter()
self._operational = False
- self._agent_discovery = False
+ self._agent_discovery_predicate = None
+ self._default_timeout = 60
# lock out run() thread
self._cv = Condition()
# for passing WorkItems to the application
@@ -514,78 +501,96 @@ class Console(Thread):
return self._address
- def create_agent( self, agent_name ):
+ def _createAgent( self, agent_id ):
"""
Factory to create/retrieve an agent for this console
"""
- if not isinstance(agent_name, AgentId):
- raise TypeError("agent_name must be an instance of AgentId")
+ if not isinstance(agent_id, AgentId):
+ raise TypeError("parameter must be an instance of class AgentId")
- agent = AgentProxy(agent_name)
-
- self._agent_map_lock.acquire()
+ self._lock.acquire()
try:
- if agent_name in self._agent_map:
- return self._agent_map[agent_name]
+ if agent_id in self._agent_map:
+ return self._agent_map[agent_id]
- agent._console = self
+ agent = Agent(agent_id, self)
agent._sender = self._session.sender(agent._address)
- self._agent_map[agent_name] = agent
+ self._agent_map[agent_id] = agent
finally:
- self._agent_map_lock.release()
+ self._lock.release()
return agent
- def destroy_agent( self, agent ):
+ def destroyAgent( self, agent ):
"""
Undoes create.
"""
- if not isinstance(agent, AgentProxy):
- raise TypeError("agent must be an instance of AgentProxy")
+ if not isinstance(agent, Agent):
+ raise TypeError("agent must be an instance of class Agent")
- self._agent_map_lock.acquire()
+ self._lock.acquire()
try:
- if agent._name in self._agent_map:
- del self._agent_map[agent._name]
+ if agent._id in self._agent_map:
+ del self._agent_map[agent._id]
finally:
- self._agent_map_lock.release()
+ self._lock.release()
- def find_agent(self, agent_name, timeout=None ):
+ def findAgent(self, agent_id, timeout=None ):
"""
- Given the name of a particular agent, return an AgentProxy representing
- that agent. Return None if the agent does not exist.
+ Given the id of a particular agent, return an instance of class Agent
+ representing that agent. Return None if the agent does not exist.
"""
- self._agent_map_lock.acquire()
+ if not isinstance(agent_id, AgentId):
+ raise TypeError("parameter must be an instance of class AgentId")
+
+ self._lock.acquire()
try:
- if agent_name in self._agent_map:
- return self._agent_map[agent_name]
+ if agent_id in self._agent_map:
+ return self._agent_map[agent_id]
finally:
- self._agent_map_lock.release()
-
- new_agent = self.create_agent(agent_name)
- msg = Message(subject=makeSubject(OpCode.agent_locate),
- properties={"method":"request"},
- content={"query": {"vendor" : agent_name.vendor(),
- "product" : agent_name.product(),
- "name" : agent_name.name()}})
- handle = new_agent._send_msg(msg)
- if handle == 0:
- raise Exception("Failed to send Agent locate message to agent %s" % str(agent_name))
+ self._lock.release()
+
+ # agent not present yet - ping it with an agent_locate
- msg = new_agent._fetch_reply_msg(handle, timeout)
- if not msg:
- logging.debug("Unable to contact agent '%s' - no reply." % agent_name)
- self.destroy_agent(new_agent)
+ handle = self._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+ try:
+ tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id))
+ msg = Message(subject=makeSubject(OpCode.agent_locate),
+ properties={"method":"request"},
+ content={"query": {Query._TARGET: {Query._TARGET_AGENT_ID:None},
+ Query._PREDICATE:
+ [Query._LOGIC_AND,
+ [Query._CMP_EQ, "vendor", agent_id.vendor()],
+ [Query._CMP_EQ, "product", agent_id.product()],
+ [Query._CMP_EQ, "name", agent_id.name()]]}})
+ msg.reply_to = self._address
+ msg.correlation_id = str(handle)
+ tmp_sender.send( msg )
+ except SendError, e:
+ logging.error(str(e))
+ self._req_correlation.release(handle)
return None
- # @todo - for now, dump the message
- logging.info( "agent-locate reply received for %s" % agent_name)
- return new_agent
+ if not timeout:
+ timeout = self._default_timeout
+
+ new_agent = None
+ self._req_correlation.get_data( handle, timeout )
+ self._req_correlation.release(handle)
+ self._lock.acquire()
+ try:
+ if agent_id in self._agent_map:
+ new_agent = self._agent_map[agent_id]
+ finally:
+ self._lock.release()
+ return new_agent
def run(self):
@@ -600,14 +605,14 @@ class Console(Thread):
try:
msg = self._announce_recvr.fetch(timeout = 0)
if msg:
- self._rcv_announce(msg)
+ self._dispatch(msg, _direct=False)
except:
pass
try:
msg = self._direct_recvr.fetch(timeout = 0)
if msg:
- self._rcv_direct(msg)
+ self._dispatch(msg, _direct=True)
except:
pass
@@ -626,7 +631,7 @@ class Console(Thread):
_callback_thread = currentThread()
logging.info("Calling console indication")
- self._notifier.console_indication()
+ self._notifier.indication()
_callback_thread = None
while self._operational and \
@@ -640,79 +645,120 @@ class Console(Thread):
# called by run() thread ONLY
#
- def _rcv_announce(self, msg):
+ def _dispatch(self, msg, _direct=True):
"""
- PRIVATE: Process a message received on the announce receiver
+ PRIVATE: Process a message received from an Agent
"""
- logging.info( "Announce message received!" )
+ logging.error( "Message received from Agent! [%s]" % msg )
try:
version,opcode = parseSubject(msg.subject)
+ # @todo: deal with version mismatch!!!
except:
- logging.debug("Ignoring unrecognized broadcast message '%s'" % msg.subject)
+ logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject)
return
- amap = {}; props = {}
+ cmap = {}; props = {}
if msg.content_type == "amqp/map":
- amap = msg.content
+ cmap = msg.content
if msg.properties:
props = msg.properties
if opcode == OpCode.agent_ind:
- # agent indication
- if "name" in amap:
- if self._agent_discovery:
- ind = amap["name"]
- if "vendor" in ind and "product" in ind and "name" in ind:
-
- agent = self.create_agent(AgentId( ind["vendor"],
- ind["product"],
- ind["name"] ))
- if not agent._exists:
- # new agent
- agent._exists = True
- logging.info("AGENT_ADDED for %s" % agent)
- wi = WorkItem(WorkItem.AGENT_ADDED,
- {"agent": agent})
- self._work_q.put(wi)
+ self._handleAgentIndMsg( msg, cmap, version, _direct )
+ elif opcode == OpCode.data_ind:
+ logging.warning("!!! data_ind TBD !!!")
+ elif opcode == OpCode.event_ind:
+ logging.warning("!!! event_ind TBD !!!")
+ elif opcode == OpCode.managed_object:
+ logging.warning("!!! managed_object TBD !!!")
+ elif opcode == OpCode.object_ind:
+ logging.warning("!!! object_ind TBD !!!")
+ elif opcode == OpCode.response:
+ logging.warning("!!! response TBD !!!")
+ elif opcode == OpCode.schema_ind:
+ logging.warning("!!! schema_ind TBD !!!")
+ elif opcode == OpCode.noop:
+ logging.debug("No-op msg received.")
else:
logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
-
- # called by run() thread ONLY
- #
- def _rcv_direct(self, msg):
+ def _handleAgentIndMsg(self, msg, cmap, version, direct):
"""
- PRIVATE: Process a message sent to my direct receiver
+ Process a received agent-ind message. This message may be a response to a
+ agent-locate, or it can be an unsolicited agent announce.
"""
- logging.info( "direct message received!" )
+ logging.debug("_handleAgentIndMsg '%s'" % msg)
+
+ if Query._TARGET_AGENT_ID in cmap:
+ try:
+ agent_id = AgentIdFactory(cmap[Query._TARGET_AGENT_ID])
+ except:
+ logging.debug("Bad agent-ind message received: '%s'" % msg)
+ return
+
+ ignore = True
+ matched = False
+ correlated = False
if msg.correlation_id:
- self._req_correlation.put_data(msg.correlation_id, msg)
+ correlated = self._req_correlation.isValid(msg.correlation_id)
+
+ if direct and correlated:
+ ignore = False
+ elif self._agent_discovery_predicate:
+ matched = _doQuery( self._agent_discovery_predicate,
+ agent_id.mapEncode() )
+ ignore = not matched
+
+ if not ignore:
+ agent = None
+ self._lock.acquire()
+ try:
+ if agent_id in self._agent_map:
+ agent = self._agent_map[agent_id]
+ finally:
+ self._lock.release()
+ if not agent:
+ # need to create and add a new agent
+ agent = self._createAgent(agent_id)
+ old_timestamp = agent._announce_timestamp
+ agent._announce_timestamp = time.time()
- def enable_agent_discovery(self):
+ if old_timestamp == 0 and matched:
+ logging.debug("AGENT_ADDED for %s" % agent)
+ wi = WorkItem(WorkItem.AGENT_ADDED,
+ {"agent": agent})
+ self._work_q.put(wi)
+
+ if correlated:
+ # wake up all waiters
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
+
+
+ def enableAgentDiscovery(self, query=None):
"""
Called to enable the asynchronous Agent Discovery process.
Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
"""
- if not self._agent_discovery:
- self._agent_discovery = True
- msg = Message(subject=makeSubject(OpCode.agent_locate),
- properties={"method":"request"},
- content={"query": {"vendor": "*",
- "product": "*",
- "name": "*"}})
- self._locate_sender.send(msg)
-
+ self._agent_discovery_predicate = [Query._CMP_TRUE] # default: match all indications
+ if query:
+ if not isinstance(query, dict):
+ raise TypeError("parameter must be of type dict")
+ if Query._TARGET not in query or query[Query._TARGET] != {Query._TARGET_AGENT_ID:None}:
+ raise TypeError("query must be for an agent '%s'" % query)
+ if Query._PREDICATE in query:
+ self._agent_discovery_predicate = query[Query._PREDICATE][:]
- def disable_agent_discovery(self):
+ def disableAgentDiscovery(self):
"""
Called to disable the async Agent Discovery process enabled by
- calling enable_agent_discovery()
+ calling enableAgentDiscovery()
"""
- self._agent_discovery = False
+ self._agent_discovery_predicate = None
@@ -731,7 +777,7 @@ class Console(Thread):
"""
try:
wi = self._work_q.get(True, timeout)
- except Queue.Empty:
+ except:
return None
return wi
@@ -1125,8 +1171,7 @@ class Console(Thread):
if __name__ == '__main__':
# temp test code
- import time
- from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory,
+ from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory,
SchemaObjectClassFactory, ObjectIdFactory, QmfData, QmfDescribed,
QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
QmfEvent)
@@ -1142,7 +1187,7 @@ if __name__ == '__main__':
_myConsole.add_connection( _c )
logging.info( "Finding Agent" )
- _myAgent = _myConsole.find_agent( AgentId( "redhat.com", "agent", "tross" ), 5 )
+ _myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" ), 5 )
logging.info( "Agent Found: %s" % _myAgent )
@@ -1159,7 +1204,7 @@ if __name__ == '__main__':
self._myContext = context
self.WorkAvailable = False
- def console_indication(self):
+ def indication(self):
print("Indication received! context=%d" % self._myContext)
self.WorkAvailable = True
@@ -1168,7 +1213,7 @@ if __name__ == '__main__':
_myConsole = Console(notifier=_noteMe)
_myConsole.add_connection( _c )
- _myConsole.enable_agent_discovery()
+ _myConsole.enableAgentDiscovery()
logging.info("Waiting...")
diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py
index ea7366945a..cd1644bca2 100644
--- a/qpid/python/qmf/test/agent_test.py
+++ b/qpid/python/qmf/test/agent_test.py
@@ -1,58 +1,100 @@
import logging
import time
+from threading import Semaphore
+
from qpid.messaging import *
from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty,
SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed,
QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory,
- QmfEvent, SchemaMethod)
+ QmfEvent, SchemaMethod, Notifier)
from qmfAgent import (Agent, QmfAgentData)
-class MyAgent(object):
- def main(self):
- self._agent = Agent( "redhat.com", "qmf", "testAgent" )
+class ExampleNotifier(Notifier):
+ def __init__(self):
+ self._sema4 = Semaphore(0) # locked
+
+ def indication(self):
+ self._sema4.release()
+
+ def waitForWork(self):
+ logging.error("Waiting for event...")
+ self._sema4.acquire()
+ logging.error("...event present")
+
+
+
+#
+# An example agent application
+#
+
+_notifier = ExampleNotifier()
+_agent = Agent( "redhat.com", "qmf", "testAgent", _notifier )
- # Dynamically construct a class schema
+# Dynamically construct a class schema
+
+_schema = SchemaObjectClass( "MyPackage", "MyClass",
+ desc="A test data schema",
+ _pkey=["index1", "index2"] )
+# add properties
+_schema.addProperty( "index1",
+ SchemaProperty(qmfTypes.TYPE_UINT8))
+_schema.addProperty( "index2",
+ SchemaProperty(qmfTypes.TYPE_LSTR))
+
+# add method
+_meth = SchemaMethod( _desc="A test method" )
+_meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) )
+_meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) )
+_meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) )
+
+_schema.addMethod( "meth_1", _meth )
+
+# Add schema to Agent
- _schema = SchemaObjectClass( "MyPackage", "MyClass",
- desc="A test data schema",
- _pkey=["index1", "index2"] )
- # add properties
- _schema.addProperty( "index1",
- SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.addProperty( "index2",
- SchemaProperty(qmfTypes.TYPE_LSTR))
+_agent.registerObjectClass(_schema)
- # add method
- _meth = SchemaMethod( _desc="A test method" )
- _meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) )
- _meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) )
+# instantiate managed data objects matching the schema
- _schema.addMethod( "meth_3", _meth )
+_obj = QmfAgentData( _agent, _schema )
+_obj.setProperty("index1", 100)
+_obj.setProperty("index2", "a name" )
- # Add schema to Agent
+_agent.addObject( _obj )
+_agent.addObject( QmfAgentData( _agent, _schema,
+ _props={"index1":99,
+ "index2": "another name"} ))
- self._agent.registerObjectClass(_schema)
+## Now connect to the broker
- # instantiate managed data objects matching the schema
+_c = Connection("localhost")
+_c.connect()
+_agent.setConnection(_c)
- obj = QmfAgentData( self._agent, _schema )
- obj.setProperty("index1", 100)
- obj.setProperty("index2", "a name" )
+_done = False
+while not _done:
+ try:
+ _notifier.waitForWork()
- self._agent.addObject( QmfAgentData( self._agent, _schema,
- _props={"index1":99,
- "index2": "another name"} ))
+ _wi = _agent.getNextWorkItem(timeout=0)
+ while _wi:
+ print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
+ _agent.releaseWorkItem(_wi)
+ _wi = _agent.getNextWorkitem(timeout=0)
+ except:
+ logging.info( "shutting down..." )
+ _done = True
+logging.info( "Removing connection... TBD!!!" )
+#_myConsole.remove_connection( _c, 10 )
- return None
+logging.info( "Destroying agent... TBD!!!" )
+#_myConsole.destroy( 10 )
+logging.info( "******** agent test done ********" )
-app = MyAgent()
-print( "s='%s'", str(app.main()))
diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py
new file mode 100644
index 0000000000..03b4a98808
--- /dev/null
+++ b/qpid/python/qmf/test/console_test.py
@@ -0,0 +1,67 @@
+import logging
+import time
+from threading import Semaphore
+
+
+from qpid.messaging import *
+from qmfCommon import (Notifier, Query)
+from qmfConsole import Console
+
+
+class ExampleNotifier(Notifier):
+ def __init__(self):
+ self._sema4 = Semaphore(0) # locked
+
+ def indication(self):
+ self._sema4.release()
+
+ def waitForWork(self):
+ logging.error("Waiting for event...")
+ self._sema4.acquire()
+ logging.error("...event present")
+
+
+logging.getLogger().setLevel(logging.INFO)
+
+logging.info( "Starting Connection" )
+_c = Connection("localhost")
+_c.connect()
+
+logging.info( "Starting Console" )
+
+_notifier = ExampleNotifier()
+_myConsole = Console(notifier=_notifier)
+_myConsole.add_connection( _c )
+
+# Discover only agents from vendor "redhat.com" that
+# are a "qmf" product....
+# @todo: replace "manual" query construction with
+# a formal class-based Query API
+_query = {Query._TARGET: {Query._TARGET_AGENT_ID:None},
+ Query._PREDICATE:
+ [Query._LOGIC_AND,
+ [Query._CMP_EQ, "vendor", "redhat.com"],
+ [Query._CMP_EQ, "product", "qmf"]]}
+
+_myConsole.enableAgentDiscovery(_query)
+
+_done = False
+while not _done:
+ try:
+ _notifier.waitForWork()
+
+ _wi = _myConsole.get_next_workitem(timeout=0)
+ while _wi:
+ print("!!! work item received %d:%s" % (_wi.getType(), str(_wi.getParams())))
+ _wi = _myConsole.get_next_workitem(timeout=0)
+ except:
+ logging.info( "shutting down..." )
+ _done = True
+
+logging.info( "Removing connection" )
+_myConsole.remove_connection( _c, 10 )
+
+logging.info( "Destroying console:" )
+_myConsole.destroy( 10 )
+
+logging.info( "******** console test done ********" )