diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/qmf2/agent.py | 231 | ||||
-rw-r--r-- | python/qmf2/common.py | 91 | ||||
-rw-r--r-- | python/qmf2/console.py | 168 | ||||
-rw-r--r-- | python/qmf2/tests/__init__.py | 6 | ||||
-rw-r--r-- | python/qmf2/tests/agent_discovery.py | 255 | ||||
-rw-r--r-- | python/qmf2/tests/events.py | 15 |
6 files changed, 520 insertions, 246 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py index c6a518ca31..88aee8034f 100644 --- a/python/qmf2/agent.py +++ b/python/qmf2/agent.py @@ -1,4 +1,3 @@ - # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -25,10 +24,9 @@ import Queue from threading import Thread, Lock, currentThread from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 -from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, - makeSubject, parseSubject, OpCode, QmfQuery, - SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, WorkItem, SchemaMethod) +from common import (makeSubject, parseSubject, OpCode, QmfQuery, + SchemaObjectClass, MsgKey, QmfData, QmfAddress, + SchemaClass, SchemaClassId, WorkItem, SchemaMethod) # global flag that indicates which thread (if any) is # running the agent notifier callback @@ -85,6 +83,7 @@ class Agent(Thread): self.name = str(name) self._domain = _domain + self._address = QmfAddress.direct(self.name, self._domain) self._notifier = _notifier self._heartbeat_interval = _heartbeat_interval self._max_msg_size = _max_msg_size @@ -93,9 +92,9 @@ class Agent(Thread): self._conn = None self._session = None self._direct_receiver = None - self._locate_receiver = None - self._ind_sender = None - self._event_sender = None + self._topic_receiver = None + self._direct_sender = None + self._topic_sender = None self._lock = Lock() self._packages = {} @@ -127,8 +126,8 @@ class Agent(Thread): self._conn = conn self._session = self._conn.session() - my_addr = QmfAddress.direct(self.name, self._domain) - self._direct_receiver = self._session.receiver(str(my_addr) + + # for messages directly addressed to me + self._direct_receiver = self._session.receiver(str(self._address) + ";{create:always," " node-properties:" " {type:topic," @@ -137,28 +136,33 @@ class Agent(Thread): capacity=self._capacity) logging.debug("my direct addr=%s" % self._direct_receiver.source) - locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) - self._locate_receiver = self._session.receiver(str(locate_addr) + + # for sending directly addressed messages. + self._direct_sender = self._session.sender(str(self._address.get_node()) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + logging.debug("my default direct send addr=%s" % self._direct_sender.target) + + # for receiving "broadcast" messages from consoles + default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#", + self._domain) + self._topic_receiver = self._session.receiver(str(default_addr) + ";{create:always," " node-properties:" " {type:topic}}", capacity=self._capacity) - logging.debug("agent.locate addr=%s" % self._locate_receiver.source) - + logging.debug("console.ind addr=%s" % self._topic_receiver.source) - ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - self._ind_sender = self._session.sender(str(ind_addr) + + # for sending to topic subscribers + ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND, + self._domain) + self._topic_sender = self._session.sender(str(ind_addr) + ";{create:always," " node-properties:" " {type:topic}}") - logging.debug("agent.ind addr=%s" % self._ind_sender.target) - - my_events = QmfAddress.topic(self.name, self._domain) - self._event_sender = self._session.sender(str(my_events) + - ";{create:always," - " node-properties:" - " {type:topic}}") - logging.debug("my event addr=%s" % self._event_sender.target) + logging.debug("agent.ind addr=%s" % self._topic_sender.target) self._running = True self.start() @@ -168,12 +172,15 @@ class Agent(Thread): self._running = False if self.isAlive(): # kick my thread to wake it up - my_addr = QmfAddress.direct(self.name, self._domain) - logging.debug("Making temp sender for [%s]" % str(my_addr)) - tmp_sender = self._session.sender(str(my_addr)) try: - msg = Message(subject=makeSubject(OpCode.noop)) - tmp_sender.send( msg, sync=True ) + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.noop)}, + subject=self.name, + content={"noop":"noop"}) + + # TRACE + #logging.error("!!! sending wakeup to myself: %s" % msg) + self._direct_sender.send( msg, sync=True ) except SendError, e: logging.error(str(e)) logging.debug("waiting for agent receiver thread to exit") @@ -182,12 +189,12 @@ class Agent(Thread): logging.error( "Agent thread '%s' is hung..." % self.name) self._direct_receiver.close() self._direct_receiver = None - self._locate_receiver.close() - self._locate_receiver = None - self._ind_sender.close() - self._ind_sender = None - self._event_sender.close() - self._event_sender = None + self._direct_sender.close() + self._direct_sender = None + self._topic_receiver.close() + self._topic_receiver = None + self._topic_sender.close() + self._topic_sender = None self._session.close() self._session = None self._conn = None @@ -224,16 +231,21 @@ class Agent(Thread): """ TBD """ - if not self._event_sender: + if not self._topic_sender: raise Exception("No connection available") # @todo: should we validate against the schema? _map = {"_name": self.get_name(), "_event": qmfEvent.map_encode()} - msg = Message(subject=makeSubject(OpCode.event_ind), - properties={"method":"response"}, + msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." + + qmfEvent.get_severity() + "." + self.name, + properties={"method":"response", + "qmf.subject":makeSubject(OpCode.event_ind)}, content={MsgKey.event:_map}) - self._event_sender.send(msg) + # TRACE + # logging.error("!!! Agent %s sending Event (%s)" % + # (self.name, str(msg))) + self._topic_sender.send(msg) def add_object(self, data ): """ @@ -279,17 +291,12 @@ class Agent(Thread): raise TypeError("Invalid type for error - must be QmfData") _map[SchemaMethod.KEY_ERROR] = _error.map_encode() - msg = Message(subject=makeSubject(OpCode.response), - properties={"method":"response"}, - content={MsgKey.method:_map}) + msg = Message( properties={"method":"response", + "qmf.subject":makeSubject(OpCode.response)}, + content={MsgKey.method:_map}) msg.correlation_id = handle.correlation_id - try: - tmp_snd = self._session.sender( handle.reply_to ) - tmp_snd.send(msg) - logging.debug("method-response sent to [%s]" % handle.reply_to) - except SendError, e: - logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e))) + self._send_reply(msg, handle.reply_to) def get_workitem_count(self): """ @@ -324,7 +331,12 @@ class Agent(Thread): now = datetime.datetime.utcnow() # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) if now >= next_heartbeat: - self._ind_sender.send(self._makeAgentIndMsg()) + ind = self._makeAgentIndMsg() + ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT + # TRACE + #logging.error("!!! Agent %s sending Heartbeat (%s)" % + # (self.name, str(ind))) + self._topic_sender.send(ind) logging.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) @@ -337,19 +349,23 @@ class Agent(Thread): for i in range(batch_limit): try: - msg = self._locate_receiver.fetch(timeout=0) + msg = self._topic_receiver.fetch(timeout=0) except Empty: break - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=False) + # TRACE + # logging.error("!!! Agent %s: msg on %s [%s]" % + # (self.name, self._topic_receiver.source, msg)) + self._dispatch(msg, _direct=False) for i in range(batch_limit): try: msg = self._direct_receiver.fetch(timeout=0) except Empty: break - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=True) + # TRACE + # logging.error("!!! Agent %s: msg on %s [%s]" % + # (self.name, self._direct_receiver.source, msg)) + self._dispatch(msg, _direct=True) if self._work_q_put and self._notifier: # new stuff on work queue, kick the the application... @@ -369,9 +385,37 @@ class Agent(Thread): """ _map = {"_name": self.get_name(), "_schema_timestamp": self._schema_timestamp} - return Message( subject=makeSubject(OpCode.agent_ind), - properties={"method":"response"}, - content={MsgKey.agent_info: _map}) + return Message(properties={"method":"response", + "qmf.subject":makeSubject(OpCode.agent_ind)}, + content={MsgKey.agent_info: _map}) + + def _send_reply(self, msg, reply_to): + """ + Send a reply message to the given reply_to address + """ + if not isinstance(reply_to, QmfAddress): + try: + reply_to = QmfAddress.from_string(str(reply_to)) + except ValueError: + logging.error("Invalid reply-to address '%s'" % + handle.reply_to) + + msg.subject = reply_to.get_subject() + + try: + if reply_to.is_direct(): + # TRACE + #logging.error("!!! Agent %s direct REPLY-To:%s (%s)" % + # (self.name, str(reply_to), str(msg))) + self._direct_sender.send(msg) + else: + # TRACE + # logging.error("!!! Agent %s topic REPLY-To:%s (%s)" % + # (self.name, str(reply_to), str(msg))) + self._topic_sender.send(msg) + logging.debug("reply msg sent to [%s]" % str(reply_to)) + except SendError, e: + logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) def _dispatch(self, msg, _direct=False): @@ -382,9 +426,9 @@ class Agent(Thread): """ logging.debug( "Message received from Console! [%s]" % msg ) try: - version,opcode = parseSubject(msg.subject) + version,opcode = parseSubject(msg.properties.get("qmf.subject")) except: - logging.debug("Ignoring unrecognized message '%s'" % msg.subject) + logging.warning("Ignoring unrecognized message '%s'" % msg.subject) return cmap = {}; props={} @@ -425,17 +469,12 @@ class Agent(Thread): if query is not None: # fake a QmfData containing my identifier for the query compare tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) - reply = QmfQuery(query).evaluate(tmpData) + reply = QmfQuery.from_map(query).evaluate(tmpData) if reply: - try: - tmp_snd = self._session.sender( msg.reply_to ) - m = self._makeAgentIndMsg() - m.correlation_id = msg.correlation_id - tmp_snd.send(m) - logging.debug("agent-ind sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e))) + m = self._makeAgentIndMsg() + m.correlation_id = msg.correlation_id + self._send_reply(m, msg.reply_to) else: logging.debug("agent-locate msg not mine - no reply sent") @@ -481,13 +520,6 @@ class Agent(Thread): in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) oid = cmap.get(QmfData.KEY_OBJECT_ID) - - print("!!! ci=%s rt=%s mn=%s oid=%s" % - (msg.correlation_id, - msg.reply_to, - mname, - oid)) - handle = _MethodCallHandle(msg.correlation_id, msg.reply_to, mname, @@ -509,18 +541,12 @@ class Agent(Thread): finally: self._lock.release() - try: - tmp_snd = self._session.sender( msg.reply_to ) - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content={MsgKey.package_info: pnames} ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id - tmp_snd.send(m) - logging.debug("package_info sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) - + m = Message(properties={"qmf.subject":makeSubject(OpCode.data_ind), + "method":"response"}, + content={MsgKey.package_info: pnames} ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + self._send_reply(m, msg.reply_to) def _querySchema( self, msg, query, _idOnly=False ): """ @@ -551,24 +577,18 @@ class Agent(Thread): finally: self._lock.release() - - tmp_snd = self._session.sender( msg.reply_to ) - if _idOnly: content = {MsgKey.schema_id: schemas} else: content = {MsgKey.schema:schemas} - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content=content ) + m = Message(properties={"method":"response", + "qmf.subject":makeSubject(OpCode.data_ind)}, + content=content ) if msg.correlation_id != None: m.correlation_id = msg.correlation_id - try: - tmp_snd.send(m) - logging.debug("schema_id sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + self._send_reply(m, msg.reply_to) def _queryData( self, msg, query, _idOnly=False ): @@ -600,23 +620,18 @@ class Agent(Thread): finally: self._lock.release() - tmp_snd = self._session.sender( msg.reply_to ) - if _idOnly: content = {MsgKey.object_id:data_objs} else: content = {MsgKey.data_obj:data_objs} - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content=content ) + m = Message(properties={"method":"response", + "qmf.subject":makeSubject(OpCode.data_ind)}, + content=content ) if msg.correlation_id != None: m.correlation_id = msg.correlation_id - try: - tmp_snd.send(m) - logging.debug("data reply sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + self._send_reply(m, msg.reply_to) ##============================================================================== diff --git a/python/qmf2/common.py b/python/qmf2/common.py index 061c9fbe78..3679deeb2e 100644 --- a/python/qmf2/common.py +++ b/python/qmf2/common.py @@ -34,15 +34,6 @@ except ImportError: ## Constants ## - -AMQP_QMF_AGENT_LOCATE = "agent.locate" -AMQP_QMF_AGENT_INDICATION = "agent.ind" -AMQP_QMF_AGENT_EVENT="agent.event" -# agent.ind[.<agent-name>] -# agent.event.<sev>.<agent-name> -# sev="strings" -# - AMQP_QMF_SUBJECT = "qmf" AMQP_QMF_VERSION = 4 AMQP_QMF_SUBJECT_FMT = "%s%d.%s" @@ -168,38 +159,102 @@ class WorkItem(object): class QmfAddress(object): """ + Address format: "qmf.<domain>.[topic|direct]/<subject>" TBD """ + TYPE_DIRECT = "direct" TYPE_TOPIC = "topic" ADDRESS_FMT = "qmf.%s.%s/%s" DEFAULT_DOMAIN = "default" + # Directly-addressed messages: + # agent's direct address: "qmf.<domain>.direct/<agent-name> + # console's direct address: "qmf.<domain>.direct/<console-name> - def __init__(self, name, domain, type_): - self._name = name + # Well-known Topic Addresses: + # "qmf.<domain>.topic/<subject> + # Where <subject> has the following format: + # "console.ind#" - indications sent from consoles + # "agent.ind#" - indications sent from agents + # + # The following "well known" subjects are defined: + # + # console.ind.locate[.<agent-name>] - agent discovery request + # agent.ind.heartbeat[.<agent-name>"] - agent heartbeats + # agent.ind.event[.<severity>.<agent-name>] - events + # agent.ind.schema[TBD] - schema updates + # + SUBJECT_AGENT_IND="agent.ind" + SUBJECT_AGENT_HEARTBEAT = "agent.ind.heartbeat" + SUBJECT_AGENT_EVENT="agent.ind.event" + SUBJECT_AGENT_SCHEMA="agent.ind.schema" + + SUBJECT_CONSOLE_IND="console.ind" + SUBJECT_CONSOLE_LOCATE_AGENT="console.ind.locate" + + + + def __init__(self, subject, domain, type_): + if '/' in domain or '.' in domain: + raise Exception("domain string must not contain '/' or '.'" + " characters.") + + self._subject = subject self._domain = domain self._type = type_ - def _direct(cls, name, _domain=None): + def _direct(cls, subject, _domain=None): if _domain is None: _domain = QmfAddress.DEFAULT_DOMAIN - return cls(name, _domain, type_=QmfAddress.TYPE_DIRECT) + return cls(subject, _domain, type_=QmfAddress.TYPE_DIRECT) direct = classmethod(_direct) - def _topic(cls, name, _domain=None): + def _topic(cls, subject, _domain=None): if _domain is None: _domain = QmfAddress.DEFAULT_DOMAIN - return cls(name, _domain, type_=QmfAddress.TYPE_TOPIC) + return cls(subject, _domain, type_=QmfAddress.TYPE_TOPIC) topic = classmethod(_topic) + def __from_string(cls, address): + node,subject = address.split('/',1) + qmf,domain,type_ = node.split('.',2) + + if qmf != "qmf" or (type_ != QmfAddress.TYPE_DIRECT and + type_ != QmfAddress.TYPE_TOPIC): + raise ValueError("invalid QmfAddress format: %s" % address) + + return cls(subject, domain, type_) + from_string = classmethod(__from_string) def get_address(self): + """ + Return the QMF address as a string, suitable for use with the AMQP + messaging API. + """ return str(self) + def get_node(self): + """ + Return the 'node' portion of the address. + """ + return self.get_address().split('/',1)[0] + + def get_subject(self): + """ + Return the 'subject' portion of the address. + """ + return self.get_address().split('/',1)[1] + + def get_domain(self): + return self._domain + + def is_direct(self): + return self._type == self.TYPE_DIRECT + def __repr__(self): - return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._name) + return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._subject) @@ -345,14 +400,14 @@ class QmfData(_mapEncoder): # not override it! self._schema = None - def _create(cls, values, _subtypes={}, _tag=None, _object_id=None, + def __create(cls, values, _subtypes={}, _tag=None, _object_id=None, _schema=None, _const=False): # timestamp in millisec since epoch UTC ctime = long(time.time() * 1000) return cls(_values=values, _subtypes=_subtypes, _tag=_tag, _ctime=ctime, _utime=ctime, _object_id=_object_id, _schema=_schema, _const=_const) - create = classmethod(_create) + create = classmethod(__create) def __from_map(cls, map_, _schema=None, _const=False): return cls(_map=map_, _schema=_schema, _const=_const) diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 8e8f4799f7..6e469a24ac 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -32,12 +32,10 @@ from qpid.messaging import Connection, Message, Empty, SendError from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, QmfQueryPredicate, MsgKey, QmfData, QmfAddress, - AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, SchemaClass, SchemaClassId, SchemaEventClass, SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent) - # global flag that indicates which thread (if any) is # running the console notifier callback _callback_thread=None @@ -280,11 +278,9 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - if _in_args: - _in_args = _in_args.copy() - if self._schema: # validate + _in_args = _in_args.copy() ms = self._schema.get_method(name) if ms is None: raise ValueError("Method '%s' is undefined." % name) @@ -410,6 +406,9 @@ class Agent(object): # msg.correlation_id = str(handle) if correlation_id: msg.correlation_id = str(correlation_id) + # TRACE + #logging.error("!!! Console %s sending to agent %s (%s)" % + # (self._console._name, self._name, str(msg))) self._sender.send(msg) # return handle @@ -530,8 +529,8 @@ class Agent(object): def _sendQuery(self, query, correlation_id=None): """ """ - msg = Message(subject=makeSubject(OpCode.get_query), - properties={"method":"request"}, + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.get_query)}, content={MsgKey.query: query.map_encode()}) self._sendMsg( msg, correlation_id ) @@ -539,8 +538,8 @@ class Agent(object): def _sendMethodReq(self, mr_map, correlation_id=None): """ """ - msg = Message(subject=makeSubject(OpCode.method_req), - properties={"method":"request"}, + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.method_req)}, content=mr_map) self._sendMsg( msg, correlation_id ) @@ -664,6 +663,8 @@ class Console(Thread): raise Exception( "Multiple connections per Console not supported." ); self._conn = conn self._session = conn.session(name=self._name) + + # for messages directly addressed to me self._direct_recvr = self._session.receiver(str(self._address) + ";{create:always," " node-properties:" @@ -673,18 +674,30 @@ class Console(Thread): capacity=1) logging.debug("my direct addr=%s" % self._direct_recvr.source) - ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - self._announce_recvr = self._session.receiver(str(ind_addr) + - ";{create:always," - " node-properties:{type:topic}}", - capacity=1) - logging.debug("agent.ind addr=%s" % self._announce_recvr.source) - - locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) - self._locate_sender = self._session.sender(str(locate_addr) + + self._direct_sender = self._session.sender(str(self._address.get_node()) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + logging.debug("my direct sender=%s" % self._direct_sender.target) + + # for receiving "broadcast" messages from agents + default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", + self._domain) + self._topic_recvr = self._session.receiver(str(default_addr) + ";{create:always," - " node-properties:{type:topic}}") - logging.debug("agent.locate addr=%s" % self._locate_sender.target) + " node-properties:{type:topic}}", + capacity=1) + logging.debug("default topic recv addr=%s" % self._topic_recvr.source) + + + # for sending to topic subscribers + topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain) + self._topic_sender = self._session.sender(str(topic_addr) + + ";{create:always," + " node-properties:{type:topic}}") + logging.debug("default topic send addr=%s" % self._topic_sender.target) # # Now that receivers are created, fire off the receive thread... @@ -709,11 +722,13 @@ class Console(Thread): self._operational = False if self.isAlive(): # kick my thread to wake it up - logging.debug("Making temp sender for [%s]" % self._address) - tmp_sender = self._session.sender(str(self._address)) + logging.debug("Sending noop to wake up [%s]" % self._address) try: - msg = Message(subject=makeSubject(OpCode.noop)) - tmp_sender.send( msg, sync=True ) + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.noop)}, + subject=self._name, + content={"noop":"noop"}) + self._direct_sender.send( msg, sync=True ) except SendError, e: logging.error(str(e)) logging.debug("waiting for console receiver thread to exit") @@ -721,15 +736,16 @@ class Console(Thread): if self.isAlive(): logging.error( "Console thread '%s' is hung..." % self.getName() ) self._direct_recvr.close() - self._announce_recvr.close() - self._locate_sender.close() + self._direct_sender.close() + self._topic_recvr.close() + self._topic_sender.close() self._session.close() self._session = None self._conn = None logging.debug("console connection removal complete") - def getAddress(self): + def get_address(self): """ The AMQP address this Console is listening to. """ @@ -752,7 +768,7 @@ class Console(Thread): def find_agent(self, name, timeout=None ): """ - Given the name of a particular agent, return an instance of class Agent + Given the name of a particular agent, return an instance of class Agent representing that agent. Return None if the agent does not exist. """ @@ -769,23 +785,20 @@ class Console(Thread): handle = self._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") + + query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) + msg = Message(subject="console.ind.locate." + name, + properties={"method":"request", + "qmf.subject":makeSubject(OpCode.agent_locate)}, + content={MsgKey.query: query.map_encode()}) + msg.reply_to = str(self._address) + msg.correlation_id = str(handle) + logging.debug("Sending Agent Locate (%s)" % time.time()) + # TRACE + #logging.error("!!! Console %s sending agent locate (%s)" % + # (self._name, str(msg))) try: - tmp_sender = self._session.sender(str(QmfAddress.direct(name, - self._domain)) - + ";{create:always," - " node-properties:" - " {type:topic," - " x-properties:" - " {type:direct}}}") - - query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) - msg = Message(subject=makeSubject(OpCode.agent_locate), - properties={"method":"request"}, - content={MsgKey.query: query.map_encode()}) - msg.reply_to = str(self._address) - msg.correlation_id = str(handle) - logging.debug("Sending Agent Locate (%s)" % time.time()) - tmp_sender.send( msg ) + self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) self._req_correlation.release(handle) @@ -807,6 +820,30 @@ class Console(Thread): return new_agent + def get_agents(self): + """ + Return the list of known agents. + """ + self._lock.acquire() + try: + agents = self._agent_map.values() + finally: + self._lock.release() + return agents + + + def get_agent(self, name): + """ + Return the named agent, else None if not currently available. + """ + self._lock.acquire() + try: + agent = self._agent_map.get(name) + finally: + self._lock.release() + return agent + + def doQuery(self, agent, query, timeout=None ): """ """ @@ -898,9 +935,12 @@ class Console(Thread): while True: try: - msg = self._announce_recvr.fetch(timeout=0) + msg = self._topic_recvr.fetch(timeout=0) except Empty: break + # TRACE: + # logging.error("!!! Console %s: msg on %s [%s]" % + # (self._name, self._topic_recvr.source, msg)) self._dispatch(msg, _direct=False) while True: @@ -908,16 +948,11 @@ class Console(Thread): msg = self._direct_recvr.fetch(timeout = 0) except Empty: break + # TRACE + #logging.error("!!! Console %s: msg on %s [%s]" % + # (self._name, self._direct_recvr.source, msg)) self._dispatch(msg, _direct=True) - for agent in self._agent_map.itervalues(): - try: - msg = agent._event_recvr.fetch(timeout = 0) - except Empty: - continue - self._dispatch(msg, _direct=False) - - self._expireAgents() # check for expired agents #if qLen == 0 and self._work_q.qsize() and self._notifier: @@ -1026,10 +1061,10 @@ class Console(Thread): """ logging.debug( "Message received from Agent! [%s]" % msg ) try: - version,opcode = parseSubject(msg.subject) + version,opcode = parseSubject(msg.properties.get("qmf.subject")) # @todo: deal with version mismatch!!! except: - logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject) + logging.error("Ignoring unrecognized message '%s'" % msg) return cmap = {}; props = {} @@ -1171,8 +1206,13 @@ class Console(Thread): if not emap: logging.debug("No '_event' field in event indication message.") return - # @todo: do I need to lock this??? - agent = self._agent_map.get(aname) + + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(aname) + finally: + self._lock.release() if not agent: logging.debug("Agent '%s' not known." % aname) return @@ -1251,17 +1291,6 @@ class Console(Thread): return None logging.debug("created agent sender %s" % agent._sender.target) - events_addr = QmfAddress.topic(name, self._domain) - try: - agent._event_recvr = self._session.receiver(str(events_addr) + - ";{create:always," - " node-properties:{type:topic}}", - capacity=1) - except: - logging.warning("Unable to create event receiver for %s" % name) - return None - logging.debug("created agent event receiver %s" % agent._event_recvr.source) - self._agent_map[name] = agent finally: self._lock.release() @@ -1372,7 +1401,8 @@ class Console(Thread): else: return None - + def __repr__(self): + return str(self._address) # def get_packages(self): # plist = [] diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py index 2e742b79be..e942c2fbc2 100644 --- a/python/qmf2/tests/__init__.py +++ b/python/qmf2/tests/__init__.py @@ -19,4 +19,8 @@ # under the License. # -import agent_discovery, basic_query, basic_method, obj_gets, events +import agent_discovery +import basic_query +import basic_method +import obj_gets +import events diff --git a/python/qmf2/tests/agent_discovery.py b/python/qmf2/tests/agent_discovery.py index 3c530cc060..e820c2c839 100644 --- a/python/qmf2/tests/agent_discovery.py +++ b/python/qmf2/tests/agent_discovery.py @@ -17,6 +17,7 @@ # import unittest import logging +import time from threading import Thread, Event import qpid.messaging @@ -45,40 +46,39 @@ class _testNotifier(qmf2.common.Notifier): class _agentApp(Thread): - def __init__(self, name, heartbeat): + def __init__(self, name, broker_url, heartbeat): Thread.__init__(self) + self.timeout = 3 + self.broker_url = broker_url self.notifier = _testNotifier() self.agent = qmf2.agent.Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) # No database needed for this test + self.running = False + + def start_app(self): self.running = True self.start() - def connect_agent(self, broker_url): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(broker_url.host, - broker_url.port, - broker_url.user, - broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - - def disconnect_agent(self, timeout): - if self.conn: - self.agent.remove_connection(timeout) - - def shutdown_agent(self, timeout): - self.agent.destroy(timeout) - - def stop(self): + def stop_app(self): self.running = False + # wake main thread self.notifier.indication() # hmmm... collide with daemon??? self.join(10) if self.isAlive(): - logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!") + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): + # Connect the agent to the broker, + # broker_url = "user/passwd@hostname:port" + conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + conn.connect() + self.agent.set_connection(conn) + while self.running: self.notifier.wait_for_work(None) wi = self.agent.get_next_workitem(timeout=0) @@ -87,6 +87,9 @@ class _agentApp(Thread): self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) + # done, cleanup agent + self.agent.remove_connection(self.timeout) + self.agent.destroy(self.timeout) class BaseTest(unittest.TestCase): @@ -97,26 +100,27 @@ class BaseTest(unittest.TestCase): def setUp(self): # one second agent indication interval - self.agent1 = _agentApp("agent1", 1) - self.agent1.connect_agent(self.broker) - self.agent2 = _agentApp("agent2", 1) - self.agent2.connect_agent(self.broker) + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() def tearDown(self): if self.agent1: - self.agent1.shutdown_agent(10) - self.agent1.stop() + self.agent1.stop_app() self.agent1 = None if self.agent2: - self.agent2.shutdown_agent(10) - self.agent2.stop() + self.agent2.stop_app() self.agent2 = None def test_discover_all(self): - # create console - # enable agent discovery - # wait - # expect agent add for agent1 and agent2 + """ + create console + enable agent discovery + wait + expect agent add for agent1 and agent2 + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) @@ -154,10 +158,12 @@ class BaseTest(unittest.TestCase): def test_discover_one(self): - # create console - # enable agent discovery, filter for agent1 only - # wait until timeout - # expect agent add for agent1 only + """ + create console + enable agent discovery, filter for agent1 only + wait until timeout + expect agent add for agent1 only + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) @@ -198,10 +204,12 @@ class BaseTest(unittest.TestCase): def test_heartbeat(self): - # create console with 2 sec agent timeout - # enable agent discovery, find all agents - # stop agent1, expect timeout notification - # stop agent2, expect timeout notification + """ + create console with 2 sec agent timeout + enable agent discovery, find all agents + stop agent1, expect timeout notification + stop agent2, expect timeout notification + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=2) @@ -239,8 +247,7 @@ class BaseTest(unittest.TestCase): agent1 = self.agent1 self.agent1 = None - agent1.shutdown_agent(10) - agent1.stop() + agent1.stop_app() wi = self.console.get_next_workitem(timeout=4) while wi is not None: @@ -265,8 +272,7 @@ class BaseTest(unittest.TestCase): agent2 = self.agent2 self.agent2 = None - agent2.shutdown_agent(10) - agent2.stop() + agent2.stop_app() wi = self.console.get_next_workitem(timeout=4) while wi is not None: @@ -291,11 +297,13 @@ class BaseTest(unittest.TestCase): def test_find_agent(self): - # create console - # do not enable agent discovery - # find agent1, expect success - # find agent-none, expect failure - # find agent2, expect success + """ + create console + do not enable agent discovery + find agent1, expect success + find agent-none, expect failure + find agent2, expect success + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier) self.conn = qpid.messaging.Connection(self.broker.host, @@ -318,3 +326,154 @@ class BaseTest(unittest.TestCase): self.console.destroy(10) + def test_heartbeat_x2(self): + """ + create 2 consoles with 2 sec agent timeout + enable agent discovery, find all agents + stop agent1, expect timeout notification on both consoles + stop agent2, expect timeout notification on both consoles + """ + console_count = 2 + self.consoles = [] + for i in range(console_count): + console = qmf2.console.Console("test-console-" + str(i), + notifier=_testNotifier(), + agent_timeout=2) + conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + conn.connect() + console.addConnection(conn) + console.enable_agent_discovery() + self.consoles.append(console) + + # now wait for all consoles to discover all agents, + # agents send a heartbeat once a second + for console in self.consoles: + agent1_found = agent2_found = False + wi = console.get_next_workitem(timeout=2) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + wi = console.get_next_workitem(timeout=2) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + # now kill agent1 and wait for expiration + + agent1 = self.agent1 + self.agent1 = None + agent1.stop_app() + + for console in self.consoles: + agent1_found = True + wi = console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = False + break + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + + wi = console.get_next_workitem(timeout=4) + + self.assertFalse(agent1_found, "agent1 did not delete!") + + # now kill agent2 and wait for expiration + + agent2 = self.agent2 + self.agent2 = None + agent2.stop_app() + + for console in self.consoles: + agent2_found = True + wi = console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent2": + agent2_found = False + break + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + + wi = console.get_next_workitem(timeout=4) + + self.assertFalse(agent2_found, "agent2 did not delete!") + + + for console in self.consoles: + console.destroy(10) + + + def test_find_agent_x2(self): + """ + create 2 consoles, do not enable agent discovery + console-1: find agent1, expect success + console-2: find agent2, expect success + Verify console-1 does -not- know agent2 + Verify console-2 does -not- know agent1 + """ + console_count = 2 + self.consoles = [] + for i in range(console_count): + console = qmf2.console.Console("test-console-" + str(i), + notifier=_testNotifier(), + agent_timeout=2) + conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + conn.connect() + console.addConnection(conn) + self.consoles.append(console) + + agent1 = self.consoles[0].find_agent("agent1", timeout=3) + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + agent2 = self.consoles[1].find_agent("agent2", timeout=3) + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + # wait long enough for agent heartbeats to be sent... + + time.sleep(self.agent_heartbeat * 2) + + agents = self.consoles[0].get_agents() + self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1") + agent1 = self.consoles[0].get_agent("agent1") + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + + agents = self.consoles[1].get_agents() + self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2") + agent2 = self.consoles[1].get_agent("agent2") + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + # verify no new agents were learned + + for console in self.consoles: + console.destroy(10) + diff --git a/python/qmf2/tests/events.py b/python/qmf2/tests/events.py index b2d934728d..171cb80aaa 100644 --- a/python/qmf2/tests/events.py +++ b/python/qmf2/tests/events.py @@ -22,6 +22,7 @@ import logging from threading import Thread, Event import qpid.messaging +from qpid.harness import Skipped from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, QmfData, QmfQueryPredicate, SchemaEventClass, @@ -93,7 +94,11 @@ class _agentApp(Thread): self.broker_url.port, self.broker_url.user, self.broker_url.password) - conn.connect() + try: + conn.connect() + except qpid.messaging.ConnectError, e: + raise Skipped(e) + self.agent.set_connection(conn) counter = 1 @@ -150,12 +155,18 @@ class BaseTest(unittest.TestCase): self.broker.port, self.broker.user, self.broker.password) - self.conn.connect() + try: + self.conn.connect() + except qpid.messaging.ConnectError, e: + raise Skipped(e) + self.console.addConnection(self.conn) # find the agents for aname in ["agent1", "agent2"]: + print("!!! finding aname=%s (%s)" % (aname, time.time())) agent = self.console.find_agent(aname, timeout=3) + print("!!! agent=%s aname=%s (%s)" % (agent, aname, time.time())) self.assertTrue(agent and agent.get_name() == aname) # now wait for events |