summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/qmf2/agent.py231
-rw-r--r--python/qmf2/common.py91
-rw-r--r--python/qmf2/console.py168
-rw-r--r--python/qmf2/tests/__init__.py6
-rw-r--r--python/qmf2/tests/agent_discovery.py255
-rw-r--r--python/qmf2/tests/events.py15
6 files changed, 520 insertions, 246 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py
index c6a518ca31..88aee8034f 100644
--- a/python/qmf2/agent.py
+++ b/python/qmf2/agent.py
@@ -1,4 +1,3 @@
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -25,10 +24,9 @@ import Queue
from threading import Thread, Lock, currentThread
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
-from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
- makeSubject, parseSubject, OpCode, QmfQuery,
- SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
+from common import (makeSubject, parseSubject, OpCode, QmfQuery,
+ SchemaObjectClass, MsgKey, QmfData, QmfAddress,
+ SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
@@ -85,6 +83,7 @@ class Agent(Thread):
self.name = str(name)
self._domain = _domain
+ self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
self._heartbeat_interval = _heartbeat_interval
self._max_msg_size = _max_msg_size
@@ -93,9 +92,9 @@ class Agent(Thread):
self._conn = None
self._session = None
self._direct_receiver = None
- self._locate_receiver = None
- self._ind_sender = None
- self._event_sender = None
+ self._topic_receiver = None
+ self._direct_sender = None
+ self._topic_sender = None
self._lock = Lock()
self._packages = {}
@@ -127,8 +126,8 @@ class Agent(Thread):
self._conn = conn
self._session = self._conn.session()
- my_addr = QmfAddress.direct(self.name, self._domain)
- self._direct_receiver = self._session.receiver(str(my_addr) +
+ # for messages directly addressed to me
+ self._direct_receiver = self._session.receiver(str(self._address) +
";{create:always,"
" node-properties:"
" {type:topic,"
@@ -137,28 +136,33 @@ class Agent(Thread):
capacity=self._capacity)
logging.debug("my direct addr=%s" % self._direct_receiver.source)
- locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- self._locate_receiver = self._session.receiver(str(locate_addr) +
+ # for sending directly addressed messages.
+ self._direct_sender = self._session.sender(str(self._address.get_node()) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+ logging.debug("my default direct send addr=%s" % self._direct_sender.target)
+
+ # for receiving "broadcast" messages from consoles
+ default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#",
+ self._domain)
+ self._topic_receiver = self._session.receiver(str(default_addr) +
";{create:always,"
" node-properties:"
" {type:topic}}",
capacity=self._capacity)
- logging.debug("agent.locate addr=%s" % self._locate_receiver.source)
-
+ logging.debug("console.ind addr=%s" % self._topic_receiver.source)
- ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- self._ind_sender = self._session.sender(str(ind_addr) +
+ # for sending to topic subscribers
+ ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND,
+ self._domain)
+ self._topic_sender = self._session.sender(str(ind_addr) +
";{create:always,"
" node-properties:"
" {type:topic}}")
- logging.debug("agent.ind addr=%s" % self._ind_sender.target)
-
- my_events = QmfAddress.topic(self.name, self._domain)
- self._event_sender = self._session.sender(str(my_events) +
- ";{create:always,"
- " node-properties:"
- " {type:topic}}")
- logging.debug("my event addr=%s" % self._event_sender.target)
+ logging.debug("agent.ind addr=%s" % self._topic_sender.target)
self._running = True
self.start()
@@ -168,12 +172,15 @@ class Agent(Thread):
self._running = False
if self.isAlive():
# kick my thread to wake it up
- my_addr = QmfAddress.direct(self.name, self._domain)
- logging.debug("Making temp sender for [%s]" % str(my_addr))
- tmp_sender = self._session.sender(str(my_addr))
try:
- msg = Message(subject=makeSubject(OpCode.noop))
- tmp_sender.send( msg, sync=True )
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.noop)},
+ subject=self.name,
+ content={"noop":"noop"})
+
+ # TRACE
+ #logging.error("!!! sending wakeup to myself: %s" % msg)
+ self._direct_sender.send( msg, sync=True )
except SendError, e:
logging.error(str(e))
logging.debug("waiting for agent receiver thread to exit")
@@ -182,12 +189,12 @@ class Agent(Thread):
logging.error( "Agent thread '%s' is hung..." % self.name)
self._direct_receiver.close()
self._direct_receiver = None
- self._locate_receiver.close()
- self._locate_receiver = None
- self._ind_sender.close()
- self._ind_sender = None
- self._event_sender.close()
- self._event_sender = None
+ self._direct_sender.close()
+ self._direct_sender = None
+ self._topic_receiver.close()
+ self._topic_receiver = None
+ self._topic_sender.close()
+ self._topic_sender = None
self._session.close()
self._session = None
self._conn = None
@@ -224,16 +231,21 @@ class Agent(Thread):
"""
TBD
"""
- if not self._event_sender:
+ if not self._topic_sender:
raise Exception("No connection available")
# @todo: should we validate against the schema?
_map = {"_name": self.get_name(),
"_event": qmfEvent.map_encode()}
- msg = Message(subject=makeSubject(OpCode.event_ind),
- properties={"method":"response"},
+ msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+ qmfEvent.get_severity() + "." + self.name,
+ properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.event_ind)},
content={MsgKey.event:_map})
- self._event_sender.send(msg)
+ # TRACE
+ # logging.error("!!! Agent %s sending Event (%s)" %
+ # (self.name, str(msg)))
+ self._topic_sender.send(msg)
def add_object(self, data ):
"""
@@ -279,17 +291,12 @@ class Agent(Thread):
raise TypeError("Invalid type for error - must be QmfData")
_map[SchemaMethod.KEY_ERROR] = _error.map_encode()
- msg = Message(subject=makeSubject(OpCode.response),
- properties={"method":"response"},
- content={MsgKey.method:_map})
+ msg = Message( properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.response)},
+ content={MsgKey.method:_map})
msg.correlation_id = handle.correlation_id
- try:
- tmp_snd = self._session.sender( handle.reply_to )
- tmp_snd.send(msg)
- logging.debug("method-response sent to [%s]" % handle.reply_to)
- except SendError, e:
- logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e)))
+ self._send_reply(msg, handle.reply_to)
def get_workitem_count(self):
"""
@@ -324,7 +331,12 @@ class Agent(Thread):
now = datetime.datetime.utcnow()
# print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
if now >= next_heartbeat:
- self._ind_sender.send(self._makeAgentIndMsg())
+ ind = self._makeAgentIndMsg()
+ ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
+ # TRACE
+ #logging.error("!!! Agent %s sending Heartbeat (%s)" %
+ # (self.name, str(ind)))
+ self._topic_sender.send(ind)
logging.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
@@ -337,19 +349,23 @@ class Agent(Thread):
for i in range(batch_limit):
try:
- msg = self._locate_receiver.fetch(timeout=0)
+ msg = self._topic_receiver.fetch(timeout=0)
except Empty:
break
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=False)
+ # TRACE
+ # logging.error("!!! Agent %s: msg on %s [%s]" %
+ # (self.name, self._topic_receiver.source, msg))
+ self._dispatch(msg, _direct=False)
for i in range(batch_limit):
try:
msg = self._direct_receiver.fetch(timeout=0)
except Empty:
break
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=True)
+ # TRACE
+ # logging.error("!!! Agent %s: msg on %s [%s]" %
+ # (self.name, self._direct_receiver.source, msg))
+ self._dispatch(msg, _direct=True)
if self._work_q_put and self._notifier:
# new stuff on work queue, kick the the application...
@@ -369,9 +385,37 @@ class Agent(Thread):
"""
_map = {"_name": self.get_name(),
"_schema_timestamp": self._schema_timestamp}
- return Message( subject=makeSubject(OpCode.agent_ind),
- properties={"method":"response"},
- content={MsgKey.agent_info: _map})
+ return Message(properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.agent_ind)},
+ content={MsgKey.agent_info: _map})
+
+ def _send_reply(self, msg, reply_to):
+ """
+ Send a reply message to the given reply_to address
+ """
+ if not isinstance(reply_to, QmfAddress):
+ try:
+ reply_to = QmfAddress.from_string(str(reply_to))
+ except ValueError:
+ logging.error("Invalid reply-to address '%s'" %
+ handle.reply_to)
+
+ msg.subject = reply_to.get_subject()
+
+ try:
+ if reply_to.is_direct():
+ # TRACE
+ #logging.error("!!! Agent %s direct REPLY-To:%s (%s)" %
+ # (self.name, str(reply_to), str(msg)))
+ self._direct_sender.send(msg)
+ else:
+ # TRACE
+ # logging.error("!!! Agent %s topic REPLY-To:%s (%s)" %
+ # (self.name, str(reply_to), str(msg)))
+ self._topic_sender.send(msg)
+ logging.debug("reply msg sent to [%s]" % str(reply_to))
+ except SendError, e:
+ logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
def _dispatch(self, msg, _direct=False):
@@ -382,9 +426,9 @@ class Agent(Thread):
"""
logging.debug( "Message received from Console! [%s]" % msg )
try:
- version,opcode = parseSubject(msg.subject)
+ version,opcode = parseSubject(msg.properties.get("qmf.subject"))
except:
- logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
+ logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
return
cmap = {}; props={}
@@ -425,17 +469,12 @@ class Agent(Thread):
if query is not None:
# fake a QmfData containing my identifier for the query compare
tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
- reply = QmfQuery(query).evaluate(tmpData)
+ reply = QmfQuery.from_map(query).evaluate(tmpData)
if reply:
- try:
- tmp_snd = self._session.sender( msg.reply_to )
- m = self._makeAgentIndMsg()
- m.correlation_id = msg.correlation_id
- tmp_snd.send(m)
- logging.debug("agent-ind sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e)))
+ m = self._makeAgentIndMsg()
+ m.correlation_id = msg.correlation_id
+ self._send_reply(m, msg.reply_to)
else:
logging.debug("agent-locate msg not mine - no reply sent")
@@ -481,13 +520,6 @@ class Agent(Thread):
in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
oid = cmap.get(QmfData.KEY_OBJECT_ID)
-
- print("!!! ci=%s rt=%s mn=%s oid=%s" %
- (msg.correlation_id,
- msg.reply_to,
- mname,
- oid))
-
handle = _MethodCallHandle(msg.correlation_id,
msg.reply_to,
mname,
@@ -509,18 +541,12 @@ class Agent(Thread):
finally:
self._lock.release()
- try:
- tmp_snd = self._session.sender( msg.reply_to )
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content={MsgKey.package_info: pnames} )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
- tmp_snd.send(m)
- logging.debug("package_info sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
-
+ m = Message(properties={"qmf.subject":makeSubject(OpCode.data_ind),
+ "method":"response"},
+ content={MsgKey.package_info: pnames} )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ self._send_reply(m, msg.reply_to)
def _querySchema( self, msg, query, _idOnly=False ):
"""
@@ -551,24 +577,18 @@ class Agent(Thread):
finally:
self._lock.release()
-
- tmp_snd = self._session.sender( msg.reply_to )
-
if _idOnly:
content = {MsgKey.schema_id: schemas}
else:
content = {MsgKey.schema:schemas}
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content=content )
+ m = Message(properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.data_ind)},
+ content=content )
if msg.correlation_id != None:
m.correlation_id = msg.correlation_id
- try:
- tmp_snd.send(m)
- logging.debug("schema_id sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+ self._send_reply(m, msg.reply_to)
def _queryData( self, msg, query, _idOnly=False ):
@@ -600,23 +620,18 @@ class Agent(Thread):
finally:
self._lock.release()
- tmp_snd = self._session.sender( msg.reply_to )
-
if _idOnly:
content = {MsgKey.object_id:data_objs}
else:
content = {MsgKey.data_obj:data_objs}
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content=content )
+ m = Message(properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.data_ind)},
+ content=content )
if msg.correlation_id != None:
m.correlation_id = msg.correlation_id
- try:
- tmp_snd.send(m)
- logging.debug("data reply sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+ self._send_reply(m, msg.reply_to)
##==============================================================================
diff --git a/python/qmf2/common.py b/python/qmf2/common.py
index 061c9fbe78..3679deeb2e 100644
--- a/python/qmf2/common.py
+++ b/python/qmf2/common.py
@@ -34,15 +34,6 @@ except ImportError:
## Constants
##
-
-AMQP_QMF_AGENT_LOCATE = "agent.locate"
-AMQP_QMF_AGENT_INDICATION = "agent.ind"
-AMQP_QMF_AGENT_EVENT="agent.event"
-# agent.ind[.<agent-name>]
-# agent.event.<sev>.<agent-name>
-# sev="strings"
-#
-
AMQP_QMF_SUBJECT = "qmf"
AMQP_QMF_VERSION = 4
AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
@@ -168,38 +159,102 @@ class WorkItem(object):
class QmfAddress(object):
"""
+ Address format: "qmf.<domain>.[topic|direct]/<subject>"
TBD
"""
+
TYPE_DIRECT = "direct"
TYPE_TOPIC = "topic"
ADDRESS_FMT = "qmf.%s.%s/%s"
DEFAULT_DOMAIN = "default"
+ # Directly-addressed messages:
+ # agent's direct address: "qmf.<domain>.direct/<agent-name>
+ # console's direct address: "qmf.<domain>.direct/<console-name>
- def __init__(self, name, domain, type_):
- self._name = name
+ # Well-known Topic Addresses:
+ # "qmf.<domain>.topic/<subject>
+ # Where <subject> has the following format:
+ # "console.ind#" - indications sent from consoles
+ # "agent.ind#" - indications sent from agents
+ #
+ # The following "well known" subjects are defined:
+ #
+ # console.ind.locate[.<agent-name>] - agent discovery request
+ # agent.ind.heartbeat[.<agent-name>"] - agent heartbeats
+ # agent.ind.event[.<severity>.<agent-name>] - events
+ # agent.ind.schema[TBD] - schema updates
+ #
+ SUBJECT_AGENT_IND="agent.ind"
+ SUBJECT_AGENT_HEARTBEAT = "agent.ind.heartbeat"
+ SUBJECT_AGENT_EVENT="agent.ind.event"
+ SUBJECT_AGENT_SCHEMA="agent.ind.schema"
+
+ SUBJECT_CONSOLE_IND="console.ind"
+ SUBJECT_CONSOLE_LOCATE_AGENT="console.ind.locate"
+
+
+
+ def __init__(self, subject, domain, type_):
+ if '/' in domain or '.' in domain:
+ raise Exception("domain string must not contain '/' or '.'"
+ " characters.")
+
+ self._subject = subject
self._domain = domain
self._type = type_
- def _direct(cls, name, _domain=None):
+ def _direct(cls, subject, _domain=None):
if _domain is None:
_domain = QmfAddress.DEFAULT_DOMAIN
- return cls(name, _domain, type_=QmfAddress.TYPE_DIRECT)
+ return cls(subject, _domain, type_=QmfAddress.TYPE_DIRECT)
direct = classmethod(_direct)
- def _topic(cls, name, _domain=None):
+ def _topic(cls, subject, _domain=None):
if _domain is None:
_domain = QmfAddress.DEFAULT_DOMAIN
- return cls(name, _domain, type_=QmfAddress.TYPE_TOPIC)
+ return cls(subject, _domain, type_=QmfAddress.TYPE_TOPIC)
topic = classmethod(_topic)
+ def __from_string(cls, address):
+ node,subject = address.split('/',1)
+ qmf,domain,type_ = node.split('.',2)
+
+ if qmf != "qmf" or (type_ != QmfAddress.TYPE_DIRECT and
+ type_ != QmfAddress.TYPE_TOPIC):
+ raise ValueError("invalid QmfAddress format: %s" % address)
+
+ return cls(subject, domain, type_)
+ from_string = classmethod(__from_string)
def get_address(self):
+ """
+ Return the QMF address as a string, suitable for use with the AMQP
+ messaging API.
+ """
return str(self)
+ def get_node(self):
+ """
+ Return the 'node' portion of the address.
+ """
+ return self.get_address().split('/',1)[0]
+
+ def get_subject(self):
+ """
+ Return the 'subject' portion of the address.
+ """
+ return self.get_address().split('/',1)[1]
+
+ def get_domain(self):
+ return self._domain
+
+ def is_direct(self):
+ return self._type == self.TYPE_DIRECT
+
def __repr__(self):
- return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._name)
+ return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._subject)
@@ -345,14 +400,14 @@ class QmfData(_mapEncoder):
# not override it!
self._schema = None
- def _create(cls, values, _subtypes={}, _tag=None, _object_id=None,
+ def __create(cls, values, _subtypes={}, _tag=None, _object_id=None,
_schema=None, _const=False):
# timestamp in millisec since epoch UTC
ctime = long(time.time() * 1000)
return cls(_values=values, _subtypes=_subtypes, _tag=_tag,
_ctime=ctime, _utime=ctime,
_object_id=_object_id, _schema=_schema, _const=_const)
- create = classmethod(_create)
+ create = classmethod(__create)
def __from_map(cls, map_, _schema=None, _const=False):
return cls(_map=map_, _schema=_schema, _const=_const)
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
index 8e8f4799f7..6e469a24ac 100644
--- a/python/qmf2/console.py
+++ b/python/qmf2/console.py
@@ -32,12 +32,10 @@ from qpid.messaging import Connection, Message, Empty, SendError
from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
- AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
SchemaClass, SchemaClassId, SchemaEventClass,
SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
-
# global flag that indicates which thread (if any) is
# running the console notifier callback
_callback_thread=None
@@ -280,11 +278,9 @@ class QmfConsoleData(QmfData):
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- if _in_args:
- _in_args = _in_args.copy()
-
if self._schema:
# validate
+ _in_args = _in_args.copy()
ms = self._schema.get_method(name)
if ms is None:
raise ValueError("Method '%s' is undefined." % name)
@@ -410,6 +406,9 @@ class Agent(object):
# msg.correlation_id = str(handle)
if correlation_id:
msg.correlation_id = str(correlation_id)
+ # TRACE
+ #logging.error("!!! Console %s sending to agent %s (%s)" %
+ # (self._console._name, self._name, str(msg)))
self._sender.send(msg)
# return handle
@@ -530,8 +529,8 @@ class Agent(object):
def _sendQuery(self, query, correlation_id=None):
"""
"""
- msg = Message(subject=makeSubject(OpCode.get_query),
- properties={"method":"request"},
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.get_query)},
content={MsgKey.query: query.map_encode()})
self._sendMsg( msg, correlation_id )
@@ -539,8 +538,8 @@ class Agent(object):
def _sendMethodReq(self, mr_map, correlation_id=None):
"""
"""
- msg = Message(subject=makeSubject(OpCode.method_req),
- properties={"method":"request"},
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.method_req)},
content=mr_map)
self._sendMsg( msg, correlation_id )
@@ -664,6 +663,8 @@ class Console(Thread):
raise Exception( "Multiple connections per Console not supported." );
self._conn = conn
self._session = conn.session(name=self._name)
+
+ # for messages directly addressed to me
self._direct_recvr = self._session.receiver(str(self._address) +
";{create:always,"
" node-properties:"
@@ -673,18 +674,30 @@ class Console(Thread):
capacity=1)
logging.debug("my direct addr=%s" % self._direct_recvr.source)
- ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- self._announce_recvr = self._session.receiver(str(ind_addr) +
- ";{create:always,"
- " node-properties:{type:topic}}",
- capacity=1)
- logging.debug("agent.ind addr=%s" % self._announce_recvr.source)
-
- locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- self._locate_sender = self._session.sender(str(locate_addr) +
+ self._direct_sender = self._session.sender(str(self._address.get_node()) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+ logging.debug("my direct sender=%s" % self._direct_sender.target)
+
+ # for receiving "broadcast" messages from agents
+ default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#",
+ self._domain)
+ self._topic_recvr = self._session.receiver(str(default_addr) +
";{create:always,"
- " node-properties:{type:topic}}")
- logging.debug("agent.locate addr=%s" % self._locate_sender.target)
+ " node-properties:{type:topic}}",
+ capacity=1)
+ logging.debug("default topic recv addr=%s" % self._topic_recvr.source)
+
+
+ # for sending to topic subscribers
+ topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain)
+ self._topic_sender = self._session.sender(str(topic_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}")
+ logging.debug("default topic send addr=%s" % self._topic_sender.target)
#
# Now that receivers are created, fire off the receive thread...
@@ -709,11 +722,13 @@ class Console(Thread):
self._operational = False
if self.isAlive():
# kick my thread to wake it up
- logging.debug("Making temp sender for [%s]" % self._address)
- tmp_sender = self._session.sender(str(self._address))
+ logging.debug("Sending noop to wake up [%s]" % self._address)
try:
- msg = Message(subject=makeSubject(OpCode.noop))
- tmp_sender.send( msg, sync=True )
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.noop)},
+ subject=self._name,
+ content={"noop":"noop"})
+ self._direct_sender.send( msg, sync=True )
except SendError, e:
logging.error(str(e))
logging.debug("waiting for console receiver thread to exit")
@@ -721,15 +736,16 @@ class Console(Thread):
if self.isAlive():
logging.error( "Console thread '%s' is hung..." % self.getName() )
self._direct_recvr.close()
- self._announce_recvr.close()
- self._locate_sender.close()
+ self._direct_sender.close()
+ self._topic_recvr.close()
+ self._topic_sender.close()
self._session.close()
self._session = None
self._conn = None
logging.debug("console connection removal complete")
- def getAddress(self):
+ def get_address(self):
"""
The AMQP address this Console is listening to.
"""
@@ -752,7 +768,7 @@ class Console(Thread):
def find_agent(self, name, timeout=None ):
"""
- Given the name of a particular agent, return an instance of class Agent
+ Given the name of a particular agent, return an instance of class Agent
representing that agent. Return None if the agent does not exist.
"""
@@ -769,23 +785,20 @@ class Console(Thread):
handle = self._req_correlation.allocate()
if handle == 0:
raise Exception("Can not allocate a correlation id!")
+
+ query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
+ msg = Message(subject="console.ind.locate." + name,
+ properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.agent_locate)},
+ content={MsgKey.query: query.map_encode()})
+ msg.reply_to = str(self._address)
+ msg.correlation_id = str(handle)
+ logging.debug("Sending Agent Locate (%s)" % time.time())
+ # TRACE
+ #logging.error("!!! Console %s sending agent locate (%s)" %
+ # (self._name, str(msg)))
try:
- tmp_sender = self._session.sender(str(QmfAddress.direct(name,
- self._domain))
- + ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}")
-
- query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
- msg = Message(subject=makeSubject(OpCode.agent_locate),
- properties={"method":"request"},
- content={MsgKey.query: query.map_encode()})
- msg.reply_to = str(self._address)
- msg.correlation_id = str(handle)
- logging.debug("Sending Agent Locate (%s)" % time.time())
- tmp_sender.send( msg )
+ self._topic_sender.send(msg)
except SendError, e:
logging.error(str(e))
self._req_correlation.release(handle)
@@ -807,6 +820,30 @@ class Console(Thread):
return new_agent
+ def get_agents(self):
+ """
+ Return the list of known agents.
+ """
+ self._lock.acquire()
+ try:
+ agents = self._agent_map.values()
+ finally:
+ self._lock.release()
+ return agents
+
+
+ def get_agent(self, name):
+ """
+ Return the named agent, else None if not currently available.
+ """
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(name)
+ finally:
+ self._lock.release()
+ return agent
+
+
def doQuery(self, agent, query, timeout=None ):
"""
"""
@@ -898,9 +935,12 @@ class Console(Thread):
while True:
try:
- msg = self._announce_recvr.fetch(timeout=0)
+ msg = self._topic_recvr.fetch(timeout=0)
except Empty:
break
+ # TRACE:
+ # logging.error("!!! Console %s: msg on %s [%s]" %
+ # (self._name, self._topic_recvr.source, msg))
self._dispatch(msg, _direct=False)
while True:
@@ -908,16 +948,11 @@ class Console(Thread):
msg = self._direct_recvr.fetch(timeout = 0)
except Empty:
break
+ # TRACE
+ #logging.error("!!! Console %s: msg on %s [%s]" %
+ # (self._name, self._direct_recvr.source, msg))
self._dispatch(msg, _direct=True)
- for agent in self._agent_map.itervalues():
- try:
- msg = agent._event_recvr.fetch(timeout = 0)
- except Empty:
- continue
- self._dispatch(msg, _direct=False)
-
-
self._expireAgents() # check for expired agents
#if qLen == 0 and self._work_q.qsize() and self._notifier:
@@ -1026,10 +1061,10 @@ class Console(Thread):
"""
logging.debug( "Message received from Agent! [%s]" % msg )
try:
- version,opcode = parseSubject(msg.subject)
+ version,opcode = parseSubject(msg.properties.get("qmf.subject"))
# @todo: deal with version mismatch!!!
except:
- logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject)
+ logging.error("Ignoring unrecognized message '%s'" % msg)
return
cmap = {}; props = {}
@@ -1171,8 +1206,13 @@ class Console(Thread):
if not emap:
logging.debug("No '_event' field in event indication message.")
return
- # @todo: do I need to lock this???
- agent = self._agent_map.get(aname)
+
+ agent = None
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(aname)
+ finally:
+ self._lock.release()
if not agent:
logging.debug("Agent '%s' not known." % aname)
return
@@ -1251,17 +1291,6 @@ class Console(Thread):
return None
logging.debug("created agent sender %s" % agent._sender.target)
- events_addr = QmfAddress.topic(name, self._domain)
- try:
- agent._event_recvr = self._session.receiver(str(events_addr) +
- ";{create:always,"
- " node-properties:{type:topic}}",
- capacity=1)
- except:
- logging.warning("Unable to create event receiver for %s" % name)
- return None
- logging.debug("created agent event receiver %s" % agent._event_recvr.source)
-
self._agent_map[name] = agent
finally:
self._lock.release()
@@ -1372,7 +1401,8 @@ class Console(Thread):
else:
return None
-
+ def __repr__(self):
+ return str(self._address)
# def get_packages(self):
# plist = []
diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py
index 2e742b79be..e942c2fbc2 100644
--- a/python/qmf2/tests/__init__.py
+++ b/python/qmf2/tests/__init__.py
@@ -19,4 +19,8 @@
# under the License.
#
-import agent_discovery, basic_query, basic_method, obj_gets, events
+import agent_discovery
+import basic_query
+import basic_method
+import obj_gets
+import events
diff --git a/python/qmf2/tests/agent_discovery.py b/python/qmf2/tests/agent_discovery.py
index 3c530cc060..e820c2c839 100644
--- a/python/qmf2/tests/agent_discovery.py
+++ b/python/qmf2/tests/agent_discovery.py
@@ -17,6 +17,7 @@
#
import unittest
import logging
+import time
from threading import Thread, Event
import qpid.messaging
@@ -45,40 +46,39 @@ class _testNotifier(qmf2.common.Notifier):
class _agentApp(Thread):
- def __init__(self, name, heartbeat):
+ def __init__(self, name, broker_url, heartbeat):
Thread.__init__(self)
+ self.timeout = 3
+ self.broker_url = broker_url
self.notifier = _testNotifier()
self.agent = qmf2.agent.Agent(name,
_notifier=self.notifier,
_heartbeat_interval=heartbeat)
# No database needed for this test
+ self.running = False
+
+ def start_app(self):
self.running = True
self.start()
- def connect_agent(self, broker_url):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(broker_url.host,
- broker_url.port,
- broker_url.user,
- broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
-
- def disconnect_agent(self, timeout):
- if self.conn:
- self.agent.remove_connection(timeout)
-
- def shutdown_agent(self, timeout):
- self.agent.destroy(timeout)
-
- def stop(self):
+ def stop_app(self):
self.running = False
+ # wake main thread
self.notifier.indication() # hmmm... collide with daemon???
self.join(10)
if self.isAlive():
- logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
def run(self):
+ # Connect the agent to the broker,
+ # broker_url = "user/passwd@hostname:port"
+ conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ conn.connect()
+ self.agent.set_connection(conn)
+
while self.running:
self.notifier.wait_for_work(None)
wi = self.agent.get_next_workitem(timeout=0)
@@ -87,6 +87,9 @@ class _agentApp(Thread):
self.agent.release_workitem(wi)
wi = self.agent.get_next_workitem(timeout=0)
+ # done, cleanup agent
+ self.agent.remove_connection(self.timeout)
+ self.agent.destroy(self.timeout)
class BaseTest(unittest.TestCase):
@@ -97,26 +100,27 @@ class BaseTest(unittest.TestCase):
def setUp(self):
# one second agent indication interval
- self.agent1 = _agentApp("agent1", 1)
- self.agent1.connect_agent(self.broker)
- self.agent2 = _agentApp("agent2", 1)
- self.agent2.connect_agent(self.broker)
+ self.agent_heartbeat = 1
+ self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
+ self.agent2.start_app()
def tearDown(self):
if self.agent1:
- self.agent1.shutdown_agent(10)
- self.agent1.stop()
+ self.agent1.stop_app()
self.agent1 = None
if self.agent2:
- self.agent2.shutdown_agent(10)
- self.agent2.stop()
+ self.agent2.stop_app()
self.agent2 = None
def test_discover_all(self):
- # create console
- # enable agent discovery
- # wait
- # expect agent add for agent1 and agent2
+ """
+ create console
+ enable agent discovery
+ wait
+ expect agent add for agent1 and agent2
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
@@ -154,10 +158,12 @@ class BaseTest(unittest.TestCase):
def test_discover_one(self):
- # create console
- # enable agent discovery, filter for agent1 only
- # wait until timeout
- # expect agent add for agent1 only
+ """
+ create console
+ enable agent discovery, filter for agent1 only
+ wait until timeout
+ expect agent add for agent1 only
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
@@ -198,10 +204,12 @@ class BaseTest(unittest.TestCase):
def test_heartbeat(self):
- # create console with 2 sec agent timeout
- # enable agent discovery, find all agents
- # stop agent1, expect timeout notification
- # stop agent2, expect timeout notification
+ """
+ create console with 2 sec agent timeout
+ enable agent discovery, find all agents
+ stop agent1, expect timeout notification
+ stop agent2, expect timeout notification
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=2)
@@ -239,8 +247,7 @@ class BaseTest(unittest.TestCase):
agent1 = self.agent1
self.agent1 = None
- agent1.shutdown_agent(10)
- agent1.stop()
+ agent1.stop_app()
wi = self.console.get_next_workitem(timeout=4)
while wi is not None:
@@ -265,8 +272,7 @@ class BaseTest(unittest.TestCase):
agent2 = self.agent2
self.agent2 = None
- agent2.shutdown_agent(10)
- agent2.stop()
+ agent2.stop_app()
wi = self.console.get_next_workitem(timeout=4)
while wi is not None:
@@ -291,11 +297,13 @@ class BaseTest(unittest.TestCase):
def test_find_agent(self):
- # create console
- # do not enable agent discovery
- # find agent1, expect success
- # find agent-none, expect failure
- # find agent2, expect success
+ """
+ create console
+ do not enable agent discovery
+ find agent1, expect success
+ find agent-none, expect failure
+ find agent2, expect success
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier)
self.conn = qpid.messaging.Connection(self.broker.host,
@@ -318,3 +326,154 @@ class BaseTest(unittest.TestCase):
self.console.destroy(10)
+ def test_heartbeat_x2(self):
+ """
+ create 2 consoles with 2 sec agent timeout
+ enable agent discovery, find all agents
+ stop agent1, expect timeout notification on both consoles
+ stop agent2, expect timeout notification on both consoles
+ """
+ console_count = 2
+ self.consoles = []
+ for i in range(console_count):
+ console = qmf2.console.Console("test-console-" + str(i),
+ notifier=_testNotifier(),
+ agent_timeout=2)
+ conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ conn.connect()
+ console.addConnection(conn)
+ console.enable_agent_discovery()
+ self.consoles.append(console)
+
+ # now wait for all consoles to discover all agents,
+ # agents send a heartbeat once a second
+ for console in self.consoles:
+ agent1_found = agent2_found = False
+ wi = console.get_next_workitem(timeout=2)
+ while wi and not (agent1_found and agent2_found):
+ if wi.get_type() == wi.AGENT_ADDED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf2.console.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = True
+ elif agent.get_name() == "agent2":
+ agent2_found = True
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+ if agent1_found and agent2_found:
+ break;
+ wi = console.get_next_workitem(timeout=2)
+
+ self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+ # now kill agent1 and wait for expiration
+
+ agent1 = self.agent1
+ self.agent1 = None
+ agent1.stop_app()
+
+ for console in self.consoles:
+ agent1_found = True
+ wi = console.get_next_workitem(timeout=4)
+ while wi is not None:
+ if wi.get_type() == wi.AGENT_DELETED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf2.console.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = False
+ break
+ else:
+ self.fail("Unexpected agent_deleted received: %s" %
+ agent.get_name())
+
+ wi = console.get_next_workitem(timeout=4)
+
+ self.assertFalse(agent1_found, "agent1 did not delete!")
+
+ # now kill agent2 and wait for expiration
+
+ agent2 = self.agent2
+ self.agent2 = None
+ agent2.stop_app()
+
+ for console in self.consoles:
+ agent2_found = True
+ wi = console.get_next_workitem(timeout=4)
+ while wi is not None:
+ if wi.get_type() == wi.AGENT_DELETED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf2.console.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent2":
+ agent2_found = False
+ break
+ else:
+ self.fail("Unexpected agent_deleted received: %s" %
+ agent.get_name())
+
+ wi = console.get_next_workitem(timeout=4)
+
+ self.assertFalse(agent2_found, "agent2 did not delete!")
+
+
+ for console in self.consoles:
+ console.destroy(10)
+
+
+ def test_find_agent_x2(self):
+ """
+ create 2 consoles, do not enable agent discovery
+ console-1: find agent1, expect success
+ console-2: find agent2, expect success
+ Verify console-1 does -not- know agent2
+ Verify console-2 does -not- know agent1
+ """
+ console_count = 2
+ self.consoles = []
+ for i in range(console_count):
+ console = qmf2.console.Console("test-console-" + str(i),
+ notifier=_testNotifier(),
+ agent_timeout=2)
+ conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ conn.connect()
+ console.addConnection(conn)
+ self.consoles.append(console)
+
+ agent1 = self.consoles[0].find_agent("agent1", timeout=3)
+ self.assertTrue(agent1 and agent1.get_name() == "agent1")
+
+ agent2 = self.consoles[1].find_agent("agent2", timeout=3)
+ self.assertTrue(agent2 and agent2.get_name() == "agent2")
+
+ # wait long enough for agent heartbeats to be sent...
+
+ time.sleep(self.agent_heartbeat * 2)
+
+ agents = self.consoles[0].get_agents()
+ self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1")
+ agent1 = self.consoles[0].get_agent("agent1")
+ self.assertTrue(agent1 and agent1.get_name() == "agent1")
+
+
+ agents = self.consoles[1].get_agents()
+ self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2")
+ agent2 = self.consoles[1].get_agent("agent2")
+ self.assertTrue(agent2 and agent2.get_name() == "agent2")
+
+ # verify no new agents were learned
+
+ for console in self.consoles:
+ console.destroy(10)
+
diff --git a/python/qmf2/tests/events.py b/python/qmf2/tests/events.py
index b2d934728d..171cb80aaa 100644
--- a/python/qmf2/tests/events.py
+++ b/python/qmf2/tests/events.py
@@ -22,6 +22,7 @@ import logging
from threading import Thread, Event
import qpid.messaging
+from qpid.harness import Skipped
from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
QmfData, QmfQueryPredicate, SchemaEventClass,
@@ -93,7 +94,11 @@ class _agentApp(Thread):
self.broker_url.port,
self.broker_url.user,
self.broker_url.password)
- conn.connect()
+ try:
+ conn.connect()
+ except qpid.messaging.ConnectError, e:
+ raise Skipped(e)
+
self.agent.set_connection(conn)
counter = 1
@@ -150,12 +155,18 @@ class BaseTest(unittest.TestCase):
self.broker.port,
self.broker.user,
self.broker.password)
- self.conn.connect()
+ try:
+ self.conn.connect()
+ except qpid.messaging.ConnectError, e:
+ raise Skipped(e)
+
self.console.addConnection(self.conn)
# find the agents
for aname in ["agent1", "agent2"]:
+ print("!!! finding aname=%s (%s)" % (aname, time.time()))
agent = self.console.find_agent(aname, timeout=3)
+ print("!!! agent=%s aname=%s (%s)" % (agent, aname, time.time()))
self.assertTrue(agent and agent.get_name() == aname)
# now wait for events