summaryrefslogtreecommitdiff
path: root/python/qmf2/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r--python/qmf2/console.py168
1 files changed, 99 insertions, 69 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
index 8e8f4799f7..6e469a24ac 100644
--- a/python/qmf2/console.py
+++ b/python/qmf2/console.py
@@ -32,12 +32,10 @@ from qpid.messaging import Connection, Message, Empty, SendError
from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
- AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
SchemaClass, SchemaClassId, SchemaEventClass,
SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
-
# global flag that indicates which thread (if any) is
# running the console notifier callback
_callback_thread=None
@@ -280,11 +278,9 @@ class QmfConsoleData(QmfData):
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- if _in_args:
- _in_args = _in_args.copy()
-
if self._schema:
# validate
+ _in_args = _in_args.copy()
ms = self._schema.get_method(name)
if ms is None:
raise ValueError("Method '%s' is undefined." % name)
@@ -410,6 +406,9 @@ class Agent(object):
# msg.correlation_id = str(handle)
if correlation_id:
msg.correlation_id = str(correlation_id)
+ # TRACE
+ #logging.error("!!! Console %s sending to agent %s (%s)" %
+ # (self._console._name, self._name, str(msg)))
self._sender.send(msg)
# return handle
@@ -530,8 +529,8 @@ class Agent(object):
def _sendQuery(self, query, correlation_id=None):
"""
"""
- msg = Message(subject=makeSubject(OpCode.get_query),
- properties={"method":"request"},
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.get_query)},
content={MsgKey.query: query.map_encode()})
self._sendMsg( msg, correlation_id )
@@ -539,8 +538,8 @@ class Agent(object):
def _sendMethodReq(self, mr_map, correlation_id=None):
"""
"""
- msg = Message(subject=makeSubject(OpCode.method_req),
- properties={"method":"request"},
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.method_req)},
content=mr_map)
self._sendMsg( msg, correlation_id )
@@ -664,6 +663,8 @@ class Console(Thread):
raise Exception( "Multiple connections per Console not supported." );
self._conn = conn
self._session = conn.session(name=self._name)
+
+ # for messages directly addressed to me
self._direct_recvr = self._session.receiver(str(self._address) +
";{create:always,"
" node-properties:"
@@ -673,18 +674,30 @@ class Console(Thread):
capacity=1)
logging.debug("my direct addr=%s" % self._direct_recvr.source)
- ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- self._announce_recvr = self._session.receiver(str(ind_addr) +
- ";{create:always,"
- " node-properties:{type:topic}}",
- capacity=1)
- logging.debug("agent.ind addr=%s" % self._announce_recvr.source)
-
- locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- self._locate_sender = self._session.sender(str(locate_addr) +
+ self._direct_sender = self._session.sender(str(self._address.get_node()) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+ logging.debug("my direct sender=%s" % self._direct_sender.target)
+
+ # for receiving "broadcast" messages from agents
+ default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#",
+ self._domain)
+ self._topic_recvr = self._session.receiver(str(default_addr) +
";{create:always,"
- " node-properties:{type:topic}}")
- logging.debug("agent.locate addr=%s" % self._locate_sender.target)
+ " node-properties:{type:topic}}",
+ capacity=1)
+ logging.debug("default topic recv addr=%s" % self._topic_recvr.source)
+
+
+ # for sending to topic subscribers
+ topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain)
+ self._topic_sender = self._session.sender(str(topic_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}")
+ logging.debug("default topic send addr=%s" % self._topic_sender.target)
#
# Now that receivers are created, fire off the receive thread...
@@ -709,11 +722,13 @@ class Console(Thread):
self._operational = False
if self.isAlive():
# kick my thread to wake it up
- logging.debug("Making temp sender for [%s]" % self._address)
- tmp_sender = self._session.sender(str(self._address))
+ logging.debug("Sending noop to wake up [%s]" % self._address)
try:
- msg = Message(subject=makeSubject(OpCode.noop))
- tmp_sender.send( msg, sync=True )
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.noop)},
+ subject=self._name,
+ content={"noop":"noop"})
+ self._direct_sender.send( msg, sync=True )
except SendError, e:
logging.error(str(e))
logging.debug("waiting for console receiver thread to exit")
@@ -721,15 +736,16 @@ class Console(Thread):
if self.isAlive():
logging.error( "Console thread '%s' is hung..." % self.getName() )
self._direct_recvr.close()
- self._announce_recvr.close()
- self._locate_sender.close()
+ self._direct_sender.close()
+ self._topic_recvr.close()
+ self._topic_sender.close()
self._session.close()
self._session = None
self._conn = None
logging.debug("console connection removal complete")
- def getAddress(self):
+ def get_address(self):
"""
The AMQP address this Console is listening to.
"""
@@ -752,7 +768,7 @@ class Console(Thread):
def find_agent(self, name, timeout=None ):
"""
- Given the name of a particular agent, return an instance of class Agent
+ Given the name of a particular agent, return an instance of class Agent
representing that agent. Return None if the agent does not exist.
"""
@@ -769,23 +785,20 @@ class Console(Thread):
handle = self._req_correlation.allocate()
if handle == 0:
raise Exception("Can not allocate a correlation id!")
+
+ query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
+ msg = Message(subject="console.ind.locate." + name,
+ properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.agent_locate)},
+ content={MsgKey.query: query.map_encode()})
+ msg.reply_to = str(self._address)
+ msg.correlation_id = str(handle)
+ logging.debug("Sending Agent Locate (%s)" % time.time())
+ # TRACE
+ #logging.error("!!! Console %s sending agent locate (%s)" %
+ # (self._name, str(msg)))
try:
- tmp_sender = self._session.sender(str(QmfAddress.direct(name,
- self._domain))
- + ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}")
-
- query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
- msg = Message(subject=makeSubject(OpCode.agent_locate),
- properties={"method":"request"},
- content={MsgKey.query: query.map_encode()})
- msg.reply_to = str(self._address)
- msg.correlation_id = str(handle)
- logging.debug("Sending Agent Locate (%s)" % time.time())
- tmp_sender.send( msg )
+ self._topic_sender.send(msg)
except SendError, e:
logging.error(str(e))
self._req_correlation.release(handle)
@@ -807,6 +820,30 @@ class Console(Thread):
return new_agent
+ def get_agents(self):
+ """
+ Return the list of known agents.
+ """
+ self._lock.acquire()
+ try:
+ agents = self._agent_map.values()
+ finally:
+ self._lock.release()
+ return agents
+
+
+ def get_agent(self, name):
+ """
+ Return the named agent, else None if not currently available.
+ """
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(name)
+ finally:
+ self._lock.release()
+ return agent
+
+
def doQuery(self, agent, query, timeout=None ):
"""
"""
@@ -898,9 +935,12 @@ class Console(Thread):
while True:
try:
- msg = self._announce_recvr.fetch(timeout=0)
+ msg = self._topic_recvr.fetch(timeout=0)
except Empty:
break
+ # TRACE:
+ # logging.error("!!! Console %s: msg on %s [%s]" %
+ # (self._name, self._topic_recvr.source, msg))
self._dispatch(msg, _direct=False)
while True:
@@ -908,16 +948,11 @@ class Console(Thread):
msg = self._direct_recvr.fetch(timeout = 0)
except Empty:
break
+ # TRACE
+ #logging.error("!!! Console %s: msg on %s [%s]" %
+ # (self._name, self._direct_recvr.source, msg))
self._dispatch(msg, _direct=True)
- for agent in self._agent_map.itervalues():
- try:
- msg = agent._event_recvr.fetch(timeout = 0)
- except Empty:
- continue
- self._dispatch(msg, _direct=False)
-
-
self._expireAgents() # check for expired agents
#if qLen == 0 and self._work_q.qsize() and self._notifier:
@@ -1026,10 +1061,10 @@ class Console(Thread):
"""
logging.debug( "Message received from Agent! [%s]" % msg )
try:
- version,opcode = parseSubject(msg.subject)
+ version,opcode = parseSubject(msg.properties.get("qmf.subject"))
# @todo: deal with version mismatch!!!
except:
- logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject)
+ logging.error("Ignoring unrecognized message '%s'" % msg)
return
cmap = {}; props = {}
@@ -1171,8 +1206,13 @@ class Console(Thread):
if not emap:
logging.debug("No '_event' field in event indication message.")
return
- # @todo: do I need to lock this???
- agent = self._agent_map.get(aname)
+
+ agent = None
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(aname)
+ finally:
+ self._lock.release()
if not agent:
logging.debug("Agent '%s' not known." % aname)
return
@@ -1251,17 +1291,6 @@ class Console(Thread):
return None
logging.debug("created agent sender %s" % agent._sender.target)
- events_addr = QmfAddress.topic(name, self._domain)
- try:
- agent._event_recvr = self._session.receiver(str(events_addr) +
- ";{create:always,"
- " node-properties:{type:topic}}",
- capacity=1)
- except:
- logging.warning("Unable to create event receiver for %s" % name)
- return None
- logging.debug("created agent event receiver %s" % agent._event_recvr.source)
-
self._agent_map[name] = agent
finally:
self._lock.release()
@@ -1372,7 +1401,8 @@ class Console(Thread):
else:
return None
-
+ def __repr__(self):
+ return str(self._address)
# def get_packages(self):
# plist = []