diff options
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r-- | python/qmf2/console.py | 168 |
1 files changed, 99 insertions, 69 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 8e8f4799f7..6e469a24ac 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -32,12 +32,10 @@ from qpid.messaging import Connection, Message, Empty, SendError from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, QmfQueryPredicate, MsgKey, QmfData, QmfAddress, - AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, SchemaClass, SchemaClassId, SchemaEventClass, SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent) - # global flag that indicates which thread (if any) is # running the console notifier callback _callback_thread=None @@ -280,11 +278,9 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - if _in_args: - _in_args = _in_args.copy() - if self._schema: # validate + _in_args = _in_args.copy() ms = self._schema.get_method(name) if ms is None: raise ValueError("Method '%s' is undefined." % name) @@ -410,6 +406,9 @@ class Agent(object): # msg.correlation_id = str(handle) if correlation_id: msg.correlation_id = str(correlation_id) + # TRACE + #logging.error("!!! Console %s sending to agent %s (%s)" % + # (self._console._name, self._name, str(msg))) self._sender.send(msg) # return handle @@ -530,8 +529,8 @@ class Agent(object): def _sendQuery(self, query, correlation_id=None): """ """ - msg = Message(subject=makeSubject(OpCode.get_query), - properties={"method":"request"}, + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.get_query)}, content={MsgKey.query: query.map_encode()}) self._sendMsg( msg, correlation_id ) @@ -539,8 +538,8 @@ class Agent(object): def _sendMethodReq(self, mr_map, correlation_id=None): """ """ - msg = Message(subject=makeSubject(OpCode.method_req), - properties={"method":"request"}, + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.method_req)}, content=mr_map) self._sendMsg( msg, correlation_id ) @@ -664,6 +663,8 @@ class Console(Thread): raise Exception( "Multiple connections per Console not supported." ); self._conn = conn self._session = conn.session(name=self._name) + + # for messages directly addressed to me self._direct_recvr = self._session.receiver(str(self._address) + ";{create:always," " node-properties:" @@ -673,18 +674,30 @@ class Console(Thread): capacity=1) logging.debug("my direct addr=%s" % self._direct_recvr.source) - ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - self._announce_recvr = self._session.receiver(str(ind_addr) + - ";{create:always," - " node-properties:{type:topic}}", - capacity=1) - logging.debug("agent.ind addr=%s" % self._announce_recvr.source) - - locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) - self._locate_sender = self._session.sender(str(locate_addr) + + self._direct_sender = self._session.sender(str(self._address.get_node()) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + logging.debug("my direct sender=%s" % self._direct_sender.target) + + # for receiving "broadcast" messages from agents + default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", + self._domain) + self._topic_recvr = self._session.receiver(str(default_addr) + ";{create:always," - " node-properties:{type:topic}}") - logging.debug("agent.locate addr=%s" % self._locate_sender.target) + " node-properties:{type:topic}}", + capacity=1) + logging.debug("default topic recv addr=%s" % self._topic_recvr.source) + + + # for sending to topic subscribers + topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain) + self._topic_sender = self._session.sender(str(topic_addr) + + ";{create:always," + " node-properties:{type:topic}}") + logging.debug("default topic send addr=%s" % self._topic_sender.target) # # Now that receivers are created, fire off the receive thread... @@ -709,11 +722,13 @@ class Console(Thread): self._operational = False if self.isAlive(): # kick my thread to wake it up - logging.debug("Making temp sender for [%s]" % self._address) - tmp_sender = self._session.sender(str(self._address)) + logging.debug("Sending noop to wake up [%s]" % self._address) try: - msg = Message(subject=makeSubject(OpCode.noop)) - tmp_sender.send( msg, sync=True ) + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.noop)}, + subject=self._name, + content={"noop":"noop"}) + self._direct_sender.send( msg, sync=True ) except SendError, e: logging.error(str(e)) logging.debug("waiting for console receiver thread to exit") @@ -721,15 +736,16 @@ class Console(Thread): if self.isAlive(): logging.error( "Console thread '%s' is hung..." % self.getName() ) self._direct_recvr.close() - self._announce_recvr.close() - self._locate_sender.close() + self._direct_sender.close() + self._topic_recvr.close() + self._topic_sender.close() self._session.close() self._session = None self._conn = None logging.debug("console connection removal complete") - def getAddress(self): + def get_address(self): """ The AMQP address this Console is listening to. """ @@ -752,7 +768,7 @@ class Console(Thread): def find_agent(self, name, timeout=None ): """ - Given the name of a particular agent, return an instance of class Agent + Given the name of a particular agent, return an instance of class Agent representing that agent. Return None if the agent does not exist. """ @@ -769,23 +785,20 @@ class Console(Thread): handle = self._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") + + query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) + msg = Message(subject="console.ind.locate." + name, + properties={"method":"request", + "qmf.subject":makeSubject(OpCode.agent_locate)}, + content={MsgKey.query: query.map_encode()}) + msg.reply_to = str(self._address) + msg.correlation_id = str(handle) + logging.debug("Sending Agent Locate (%s)" % time.time()) + # TRACE + #logging.error("!!! Console %s sending agent locate (%s)" % + # (self._name, str(msg))) try: - tmp_sender = self._session.sender(str(QmfAddress.direct(name, - self._domain)) - + ";{create:always," - " node-properties:" - " {type:topic," - " x-properties:" - " {type:direct}}}") - - query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) - msg = Message(subject=makeSubject(OpCode.agent_locate), - properties={"method":"request"}, - content={MsgKey.query: query.map_encode()}) - msg.reply_to = str(self._address) - msg.correlation_id = str(handle) - logging.debug("Sending Agent Locate (%s)" % time.time()) - tmp_sender.send( msg ) + self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) self._req_correlation.release(handle) @@ -807,6 +820,30 @@ class Console(Thread): return new_agent + def get_agents(self): + """ + Return the list of known agents. + """ + self._lock.acquire() + try: + agents = self._agent_map.values() + finally: + self._lock.release() + return agents + + + def get_agent(self, name): + """ + Return the named agent, else None if not currently available. + """ + self._lock.acquire() + try: + agent = self._agent_map.get(name) + finally: + self._lock.release() + return agent + + def doQuery(self, agent, query, timeout=None ): """ """ @@ -898,9 +935,12 @@ class Console(Thread): while True: try: - msg = self._announce_recvr.fetch(timeout=0) + msg = self._topic_recvr.fetch(timeout=0) except Empty: break + # TRACE: + # logging.error("!!! Console %s: msg on %s [%s]" % + # (self._name, self._topic_recvr.source, msg)) self._dispatch(msg, _direct=False) while True: @@ -908,16 +948,11 @@ class Console(Thread): msg = self._direct_recvr.fetch(timeout = 0) except Empty: break + # TRACE + #logging.error("!!! Console %s: msg on %s [%s]" % + # (self._name, self._direct_recvr.source, msg)) self._dispatch(msg, _direct=True) - for agent in self._agent_map.itervalues(): - try: - msg = agent._event_recvr.fetch(timeout = 0) - except Empty: - continue - self._dispatch(msg, _direct=False) - - self._expireAgents() # check for expired agents #if qLen == 0 and self._work_q.qsize() and self._notifier: @@ -1026,10 +1061,10 @@ class Console(Thread): """ logging.debug( "Message received from Agent! [%s]" % msg ) try: - version,opcode = parseSubject(msg.subject) + version,opcode = parseSubject(msg.properties.get("qmf.subject")) # @todo: deal with version mismatch!!! except: - logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject) + logging.error("Ignoring unrecognized message '%s'" % msg) return cmap = {}; props = {} @@ -1171,8 +1206,13 @@ class Console(Thread): if not emap: logging.debug("No '_event' field in event indication message.") return - # @todo: do I need to lock this??? - agent = self._agent_map.get(aname) + + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(aname) + finally: + self._lock.release() if not agent: logging.debug("Agent '%s' not known." % aname) return @@ -1251,17 +1291,6 @@ class Console(Thread): return None logging.debug("created agent sender %s" % agent._sender.target) - events_addr = QmfAddress.topic(name, self._domain) - try: - agent._event_recvr = self._session.receiver(str(events_addr) + - ";{create:always," - " node-properties:{type:topic}}", - capacity=1) - except: - logging.warning("Unable to create event receiver for %s" % name) - return None - logging.debug("created agent event receiver %s" % agent._event_recvr.source) - self._agent_map[name] = agent finally: self._lock.release() @@ -1372,7 +1401,8 @@ class Console(Thread): else: return None - + def __repr__(self): + return str(self._address) # def get_packages(self): # plist = [] |