diff options
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2/console.py')
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/console.py | 284 |
1 files changed, 144 insertions, 140 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py index afd20c3655..b62aa7342b 100644 --- a/qpid/extras/qmf/src/py/qmf2/console.py +++ b/qpid/extras/qmf/src/py/qmf2/console.py @@ -18,11 +18,11 @@ # import sys import os -import logging import platform import time import datetime import Queue +from logging import getLogger from threading import Thread, Event from threading import RLock from threading import currentThread @@ -41,6 +41,8 @@ from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType, _callback_thread=None +log = getLogger("qmf") +trace = getLogger("qmf.console") ##============================================================================== @@ -213,6 +215,7 @@ class _QueryMailbox(_AsyncMailbox): Process query response messages delivered to this mailbox. Invoked by Console Management thread only. """ + trace.debug("Delivering to query mailbox (agent=%s)." % self.agent_name) objects = reply.content if isinstance(objects, type([])): # convert from map to native types if needed @@ -253,7 +256,7 @@ class _QueryMailbox(_AsyncMailbox): self.result += objects if not "partial" in reply.properties: - # logging.error("QUERY COMPLETE for %s" % str(self.context)) + # log.error("QUERY COMPLETE for %s" % str(self.context)) wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) self.console._work_q_put = True @@ -262,8 +265,7 @@ class _QueryMailbox(_AsyncMailbox): def expire(self): - logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % - datetime.datetime.utcnow()) + trace.debug("Expiring query mailbox (agent=%s)." % self.agent_name) # send along whatever (possibly none) has been received so far wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) @@ -291,6 +293,7 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): """ Process schema response messages. """ + trace.debug("Delivering schema mailbox (id=%s)." % self.schema_id) done = False schemas = reply.content if schemas and isinstance(schemas, type([])): @@ -309,6 +312,7 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): def expire(self): + trace.debug("Expiring schema mailbox (id=%s)." % self.schema_id) self.destroy() @@ -332,10 +336,10 @@ class _MethodMailbox(_AsyncMailbox): Process method response messages delivered to this mailbox. Invoked by Console Management thread only. """ - + trace.debug("Delivering to method mailbox.") _map = reply.content if not _map or not isinstance(_map, type({})): - logging.error("Invalid method call reply message") + log.error("Invalid method call reply message") result = None else: error=_map.get(SchemaMethod.KEY_ERROR) @@ -358,8 +362,7 @@ class _MethodMailbox(_AsyncMailbox): The mailbox expired without receiving a reply. Invoked by the Console Management thread only. """ - logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % - datetime.datetime.utcnow()) + trace.debug("Expiring method mailbox.") # send along an empty response wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None) self.console._work_q.put(wi) @@ -391,30 +394,30 @@ class _SubscriptionMailbox(_AsyncMailbox): def subscribe(self, query): agent = self.console.get_agent(self.agent_name) if not agent: - logging.warning("subscribed failed - unknown agent '%s'" % + log.warning("subscribed failed - unknown agent '%s'" % self.agent_name) return False try: - logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name) + trace.debug("Sending Subscribe to Agent (%s)" % self.agent_name) agent._send_subscribe_req(query, self.get_address(), self.interval, self.duration) except SendError, e: - logging.error(str(e)) + log.error(str(e)) return False return True def resubscribe(self, duration): agent = self.console.get_agent(self.agent_name) if not agent: - logging.warning("resubscribed failed - unknown agent '%s'" % + log.warning("resubscribed failed - unknown agent '%s'" % self.agent_name) return False try: - logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name) + trace.debug("Sending resubscribe to Agent (%s)" % self.agent_name) agent._send_resubscribe_req(self.get_address(), self.agent_subscription_id, duration) except SendError, e: - logging.error(str(e)) + log.error(str(e)) return False return True @@ -430,7 +433,7 @@ class _SubscriptionMailbox(_AsyncMailbox): try: e_map = QmfData.from_map(error) except TypeError: - logging.warning("Invalid QmfData map received: '%s'" + log.warning("Invalid QmfData map received: '%s'" % str(error)) e_map = QmfData.create({"error":"Unknown error"}) sp = SubscribeParams(None, None, None, e_map) @@ -456,12 +459,12 @@ class _SubscriptionMailbox(_AsyncMailbox): # else: data indication agent_name = msg.properties.get("qmf.agent") if not agent_name: - logging.warning("Ignoring data_ind - no agent name given: %s" % + log.warning("Ignoring data_ind - no agent name given: %s" % msg) return agent = self.console.get_agent(agent_name) if not agent: - logging.warning("Ignoring data_ind - unknown agent '%s'" % + log.warning("Ignoring data_ind - unknown agent '%s'" % agent_name) return @@ -625,7 +628,7 @@ class QmfConsoleData(QmfData): contents. """ if _reply_handle is not None: - logging.error(" ASYNC REFRESH TBD!!!") + log.error(" ASYNC REFRESH TBD!!!") return None assert self._agent @@ -677,28 +680,28 @@ class QmfConsoleData(QmfData): if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args - logging.debug("Sending method req to Agent (%s)" % time.time()) + trace.debug("Sending method req to Agent (%s)" % time.time()) try: self._agent._send_method_req(_map, cid) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None if _reply_handle is not None: return True - logging.debug("Waiting for response to method req (%s)" % _timeout) + trace.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) mbox.destroy() if not replyMsg: - logging.debug("Agent method req wait timed-out.") + trace.debug("Agent method req wait timed-out.") return None _map = replyMsg.content if not _map or not isinstance(_map, type({})): - logging.error("Invalid method call reply message") + log.error("Invalid method call reply message") return None error=_map.get(SchemaMethod.KEY_ERROR) @@ -751,7 +754,7 @@ class Agent(object): self._packages = {} # map of {package-name:[list of class-names], } for this agent self._subscriptions = [] # list of active standing subscriptions for this agent self._announce_timestamp = None # datetime when last announce received - logging.debug( "Created Agent with address: [%s]" % self._address ) + trace.debug( "Created Agent with address: [%s]" % self._address ) def get_name(self): @@ -768,7 +771,7 @@ class Agent(object): if correlation_id: msg.correlation_id = str(correlation_id) # TRACE - #logging.error("!!! Console %s sending to agent %s (%s)" % + #log.error("!!! Console %s sending to agent %s (%s)" % # (self._console._name, self._name, str(msg))) self._sender.send(msg) # return handle @@ -846,28 +849,28 @@ class Agent(object): if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy() - logging.debug("Sending method req to Agent (%s)" % time.time()) + trace.debug("Sending method req to Agent (%s)" % time.time()) try: self._send_method_req(_map, cid) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None if _reply_handle is not None: return True - logging.debug("Waiting for response to method req (%s)" % _timeout) + trace.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) mbox.destroy() if not replyMsg: - logging.debug("Agent method req wait timed-out.") + trace.debug("Agent method req wait timed-out.") return None _map = replyMsg.content if not _map or not isinstance(_map, type({})): - logging.error("Invalid method call reply message") + log.error("Invalid method call reply message") return None return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), @@ -1076,10 +1079,10 @@ class Console(Thread): @type timeout: float @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. """ - logging.debug("Destroying Console...") + trace.debug("Destroying Console...") if self._conn: self.remove_connection(self._conn, timeout) - logging.debug("Console Destroyed") + trace.debug("Console Destroyed") def add_connection(self, conn): """ @@ -1103,7 +1106,7 @@ class Console(Thread): " x-properties:" " {type:direct}}}", capacity=1) - logging.debug("my direct addr=%s" % self._direct_recvr.source) + trace.debug("my direct addr=%s" % self._direct_recvr.source) self._direct_sender = self._session.sender(str(self._address.get_node()) + ";{create:always," @@ -1111,7 +1114,7 @@ class Console(Thread): " {type:topic," " x-properties:" " {type:direct}}}") - logging.debug("my direct sender=%s" % self._direct_sender.target) + trace.debug("my direct sender=%s" % self._direct_sender.target) # for receiving "broadcast" messages from agents default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", @@ -1120,7 +1123,7 @@ class Console(Thread): ";{create:always," " node-properties:{type:topic}}", capacity=1) - logging.debug("default topic recv addr=%s" % self._topic_recvr.source) + trace.debug("default topic recv addr=%s" % self._topic_recvr.source) # for sending to topic subscribers @@ -1128,7 +1131,7 @@ class Console(Thread): self._topic_sender = self._session.sender(str(topic_addr) + ";{create:always," " node-properties:{type:topic}}") - logging.debug("default topic send addr=%s" % self._topic_sender.target) + trace.debug("default topic send addr=%s" % self._topic_sender.target) # # Now that receivers are created, fire off the receive thread... @@ -1150,17 +1153,17 @@ class Console(Thread): @param conn: connection previously added by add_connection() """ if self._conn and conn and conn != self._conn: - logging.error( "Attempt to delete unknown connection: %s" % str(conn)) + log.error( "Attempt to delete unknown connection: %s" % str(conn)) # tell connection thread to shutdown self._operational = False if self.isAlive(): # kick my thread to wake it up self._wake_thread() - logging.debug("waiting for console receiver thread to exit") + trace.debug("waiting for console receiver thread to exit") self.join(timeout) if self.isAlive(): - logging.error( "Console thread '%s' is hung..." % self.getName() ) + log.error( "Console thread '%s' is hung..." % self.getName() ) self._direct_recvr.close() self._direct_sender.close() self._topic_recvr.close() @@ -1168,7 +1171,7 @@ class Console(Thread): self._session.close() self._session = None self._conn = None - logging.debug("console connection removal complete") + trace.debug("console connection removal complete") def get_address(self): @@ -1219,14 +1222,14 @@ class Console(Thread): content=query._predicate) msg.reply_to = str(self._address) msg.correlation_id = str(cid) - logging.debug("Sending Agent Locate (%s)" % time.time()) + trace.debug("Sending Agent Locate (%s)" % time.time()) # TRACE - #logging.error("!!! Console %s sending agent locate (%s)" % + #log.error("!!! Console %s sending agent locate (%s)" % # (self._name, str(msg))) try: self._topic_sender.send(msg) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None @@ -1234,10 +1237,10 @@ class Console(Thread): timeout = self._reply_timeout new_agent = None - logging.debug("Waiting for response to Agent Locate (%s)" % timeout) + trace.debug("Waiting for response to Agent Locate (%s)" % timeout) mbox.fetch(timeout) mbox.destroy() - logging.debug("Agent Locate wait ended (%s)" % time.time()) + trace.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: new_agent = self._agent_map.get(name) @@ -1288,10 +1291,10 @@ class Console(Thread): cid = mbox.get_address() try: - logging.debug("Sending Query to Agent (%s)" % time.time()) + trace.debug("Sending Query to Agent (%s)" % time.time()) agent._send_query(query, cid) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None @@ -1302,7 +1305,7 @@ class Console(Thread): if not _timeout: _timeout = self._reply_timeout - logging.debug("Waiting for response to Query (%s)" % _timeout) + trace.debug("Waiting for response to Query (%s)" % _timeout) now = datetime.datetime.utcnow() expire = now + datetime.timedelta(seconds=_timeout) @@ -1311,7 +1314,7 @@ class Console(Thread): _timeout = timedelta_to_secs(expire - now) reply = mbox.fetch(_timeout) if not reply: - logging.debug("Query wait timed-out.") + trace.debug("Query wait timed-out.") break objects = reply.content @@ -1383,12 +1386,12 @@ class Console(Thread): mbox.destroy() return None - logging.debug("Waiting for response to subscription (%s)" % _timeout) + trace.debug("Waiting for response to subscription (%s)" % _timeout) # @todo: what if mbox expires here? sp = mbox.fetch(_timeout) if not sp: - logging.debug("Subscription request wait timed-out.") + trace.debug("Subscription request wait timed-out.") mbox.destroy() return None @@ -1405,7 +1408,7 @@ class Console(Thread): mbox = self._get_mailbox(subscription_id) if not mbox: - logging.warning("Subscription %s not found." % subscription_id) + log.warning("Subscription %s not found." % subscription_id) return None if isinstance(mbox, _AsyncSubscriptionMailbox): @@ -1418,11 +1421,11 @@ class Console(Thread): # wait for reply - logging.debug("Waiting for response to subscription (%s)" % _timeout) + trace.debug("Waiting for response to subscription (%s)" % _timeout) sp = mbox.fetch(_timeout) if not sp: - logging.debug("re-subscribe request wait timed-out.") + trace.debug("re-subscribe request wait timed-out.") # @todo???? mbox.destroy() return None @@ -1439,11 +1442,11 @@ class Console(Thread): agent = self.get_agent(mbox.agent_name) if agent: try: - logging.debug("Sending UnSubscribe to Agent (%s)" % time.time()) + trace.debug("Sending UnSubscribe to Agent (%s)" % time.time()) agent._send_unsubscribe_ind(subscription_id, mbox.agent_subscription_id) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() @@ -1453,16 +1456,16 @@ class Console(Thread): Make the console management thread loop wakeup from its next_receiver sleep. """ - logging.debug("Sending noop to wake up [%s]" % self._address) + trace.debug("Sending noop to wake up [%s]" % self._address) msg = Message(id=QMF_APP_ID, subject=self._name, - properties={"method":"request", + properties={"method":"indication", "qmf.opcode":OpCode.noop}, content={}) try: self._direct_sender.send( msg, sync=True ) except SendError, e: - logging.error(str(e)) + log.error(str(e)) def run(self): @@ -1484,7 +1487,7 @@ class Console(Thread): except Empty: break # TRACE: - # logging.error("!!! Console %s: msg on %s [%s]" % + # log.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._topic_recvr.source, msg)) self._dispatch(msg, _direct=False) @@ -1494,7 +1497,7 @@ class Console(Thread): except Empty: break # TRACE - #logging.error("!!! Console %s: msg on %s [%s]" % + #log.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._direct_recvr.source, msg)) self._dispatch(msg, _direct=True) @@ -1506,36 +1509,37 @@ class Console(Thread): # new stuff on work queue, kick the the application... self._work_q_put = False _callback_thread = currentThread() - logging.info("Calling console notifier.indication") + trace.debug("Calling console notifier.indication") self._notifier.indication() _callback_thread = None - if self._operational: - # wait for a message to arrive, or an agent - # to expire, or a mailbox requrest to time out - now = datetime.datetime.utcnow() - next_expire = self._next_agent_expire - # the mailbox expire flag may be cleared by the - # app thread(s) - self._lock.acquire() - try: - if (self._next_mbox_expire and - self._next_mbox_expire < next_expire): - next_expire = self._next_mbox_expire - finally: - self._lock.release() + # wait for a message to arrive, or an agent + # to expire, or a mailbox requrest to time out + now = datetime.datetime.utcnow() + next_expire = self._next_agent_expire - if next_expire > now: - timeout = timedelta_to_secs(next_expire - now) - try: - logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) - xxx = self._session.next_receiver(timeout = timeout) - except Empty: - pass + self._lock.acquire() + try: + # the mailbox expire flag may be cleared by the + # app thread(s) to force an immedate mailbox scan + if self._next_mbox_expire is None: + next_expire = now + elif self._next_mbox_expire < next_expire: + next_expire = self._next_mbox_expire + finally: + self._lock.release() + timeout = timedelta_to_secs(next_expire - now) + + if self._operational and timeout > 0.0: + try: + trace.debug("waiting for next rcvr (timeout=%s)..." % timeout) + self._session.next_receiver(timeout = timeout) + except Empty: + pass - logging.debug("Shutting down Console thread") + trace.debug("Shutting down Console thread") def get_objects(self, _object_id=None, @@ -1639,12 +1643,11 @@ class Console(Thread): """ PRIVATE: Process a message received from an Agent """ - #logging.debug( "Message received from Agent! [%s]" % msg ) - #logging.error( "Message received from Agent! [%s]" % msg ) + trace.debug( "Message received from Agent! [%s]" % msg ) opcode = msg.properties.get("qmf.opcode") if not opcode: - logging.error("Ignoring unrecognized message '%s'" % msg) + log.error("Ignoring unrecognized message '%s'" % msg) return version = 2 # @todo: fix me @@ -1672,9 +1675,9 @@ class Console(Thread): else: self._handle_indication_msg(msg, cmap, version, _direct) elif opcode == OpCode.noop: - logging.debug("No-op msg received.") + trace.debug("No-op msg received.") else: - logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) + log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) def _handle_agent_ind_msg(self, msg, cmap, version, direct): @@ -1682,15 +1685,15 @@ class Console(Thread): Process a received agent-ind message. This message may be a response to a agent-locate, or it can be an unsolicited agent announce. """ - logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) + trace.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) ai_map = msg.content if not ai_map or not isinstance(ai_map, type({})): - logging.warning("Bad agent-ind message received: '%s'" % msg) + log.warning("Bad agent-ind message received: '%s'" % msg) return name = ai_map.get("_name") if not name: - logging.warning("Bad agent-ind message received: agent name missing" + log.warning("Bad agent-ind message received: agent name missing" " '%s'" % msg) return @@ -1724,48 +1727,48 @@ class Console(Thread): if matched: # unsolicited, but newly discovered - logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) + trace.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) self._work_q.put(wi) self._work_q_put = True if correlated: # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + trace.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) def _handle_response_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ - logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) + trace.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) mbox = self._get_mailbox(msg.correlation_id) if not mbox: - logging.debug("Response msg received with unknown correlation_id" - " msg='%s'" % str(msg)) + log.warning("Response msg received with unknown correlation_id" + " msg='%s'" % str(msg)) return # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + trace.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) def _handle_indication_msg(self, msg, cmap, version, _direct): aname = msg.properties.get("qmf.agent") if not aname: - logging.debug("No agent name field in indication message.") + trace.debug("No agent name field in indication message.") return content_type = msg.properties.get("qmf.content") if (content_type != ContentType.event or not isinstance(msg.content, type([]))): - logging.warning("Bad event indication message received: '%s'" % msg) + log.warning("Bad event indication message received: '%s'" % msg) return emap = msg.content[0] if not isinstance(emap, type({})): - logging.debug("Invalid event body in indication message: '%s'" % msg) + trace.debug("Invalid event body in indication message: '%s'" % msg) return agent = None @@ -1775,18 +1778,18 @@ class Console(Thread): finally: self._lock.release() if not agent: - logging.debug("Agent '%s' not known." % aname) + trace.debug("Agent '%s' not known." % aname) return try: # @todo: schema??? event = QmfEvent.from_map(emap) except TypeError: - logging.debug("Invalid QmfEvent map received: %s" % str(emap)) + trace.debug("Invalid QmfEvent map received: %s" % str(emap)) return # @todo: schema? Need to fetch it, but not from this thread! # This thread can not pend on a request. - logging.debug("Publishing event received from agent %s" % aname) + trace.debug("Publishing event received from agent %s" % aname) wi = WorkItem(WorkItem.EVENT_RECEIVED, None, {"agent":agent, "event":event}) @@ -1835,12 +1838,12 @@ class Console(Thread): next_expire_delta = lifetime_delta self._lock.acquire() try: - logging.debug("!!! expiring agents '%s'" % now) + trace.debug("!!! expiring agents '%s'" % now) for agent in self._agent_map.itervalues(): if agent._announce_timestamp: agent_deathtime = agent._announce_timestamp + lifetime_delta if agent_deathtime <= now: - logging.debug("AGENT_DELETED for %s" % agent) + trace.debug("AGENT_DELETED for %s" % agent) agent._announce_timestamp = None wi = WorkItem(WorkItem.AGENT_DELETED, None, {"agent":agent}) @@ -1852,7 +1855,7 @@ class Console(Thread): next_expire_delta = agent_deathtime - now self._next_agent_expire = now + next_expire_delta - logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) + trace.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) finally: self._lock.release() @@ -1862,7 +1865,7 @@ class Console(Thread): """ Factory to create/retrieve an agent for this console """ - logging.debug("creating agent %s" % name) + trace.debug("creating agent %s" % name) self._lock.acquire() try: agent = self._agent_map.get(name) @@ -1878,9 +1881,9 @@ class Console(Thread): " x-properties:" " {type:direct}}}") except: - logging.warning("Unable to create sender for %s" % name) + log.warning("Unable to create sender for %s" % name) return None - logging.debug("created agent sender %s" % agent._sender.target) + trace.debug("created agent sender %s" % agent._sender.target) self._agent_map[name] = agent finally: @@ -1984,11 +1987,11 @@ class Console(Thread): if need_fetch: mbox = _SchemaPrefetchMailbox(self, schema_id) query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) - logging.debug("Sending Schema Query to Agent (%s)" % time.time()) + trace.debug("Sending Schema Query to Agent (%s)" % time.time()) try: agent._send_query(query, mbox.get_address()) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() self._lock.acquire() try: @@ -2041,7 +2044,7 @@ class Console(Thread): try: mid = long(mid) except TypeError: - logging.error("Invalid mailbox id: %s" % str(mid)) + log.error("Invalid mailbox id: %s" % str(mid)) return None self._lock.acquire() @@ -2056,7 +2059,7 @@ class Console(Thread): try: mid = long(mid) except TypeError: - logging.error("Invalid mailbox id: %s" % str(mid)) + log.error("Invalid mailbox id: %s" % str(mid)) return None self._lock.acquire() @@ -2242,36 +2245,36 @@ class Console(Thread): # count += 1 # try: # if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: -# logging.debug("Console Event AGENT_ADDED received") +# trace.debug("Console Event AGENT_ADDED received") # if self._handler: # self._handler.agent_added(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: -# logging.debug("Console Event AGENT_DELETED received") +# trace.debug("Console Event AGENT_DELETED received") # if self._handler: # self._handler.agent_deleted(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: -# logging.debug("Console Event NEW_PACKAGE received") +# trace.debug("Console Event NEW_PACKAGE received") # if self._handler: # self._handler.new_package(self._event.name) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: -# logging.debug("Console Event NEW_CLASS received") +# trace.debug("Console Event NEW_CLASS received") # if self._handler: # self._handler.new_class(SchemaClassKey(self._event.classKey)) # elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: -# logging.debug("Console Event OBJECT_UPDATE received") +# trace.debug("Console Event OBJECT_UPDATE received") # if self._handler: # self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), # self._event.hasProps, self._event.hasStats) # elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: -# logging.debug("Console Event EVENT_RECEIVED received") +# trace.debug("Console Event EVENT_RECEIVED received") # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: -# logging.debug("Console Event AGENT_HEARTBEAT received") +# trace.debug("Console Event AGENT_HEARTBEAT received") # if self._handler: # self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) # elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: -# logging.debug("Console Event METHOD_RESPONSE received") +# trace.debug("Console Event METHOD_RESPONSE received") # else: -# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) +# trace.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) # except e: # print "Exception caught in callback thread:", e # self.impl.popEvent() @@ -2300,17 +2303,17 @@ class Console(Thread): # def shutdown(self): -# logging.debug("broker.shutdown() called.") +# trace.debug("broker.shutdown() called.") # self.console.impl.delConnection(self.impl) # self.conn.del_conn_handler(self) # if self._session: # self.impl.sessionClosed() -# logging.debug("broker.shutdown() sessionClosed done.") +# trace.debug("broker.shutdown() sessionClosed done.") # self._session.destroy() -# logging.debug("broker.shutdown() session destroy done.") +# trace.debug("broker.shutdown() session destroy done.") # self._session = None # self._operational = False -# logging.debug("broker.shutdown() done.") +# trace.debug("broker.shutdown() done.") # def wait_for_stable(self, timeout = None): @@ -2343,24 +2346,24 @@ class Console(Thread): # while valid: # count += 1 # if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: -# logging.debug("Broker Event BROKER_INFO received"); +# trace.debug("Broker Event BROKER_INFO received"); # elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: -# logging.debug("Broker Event DECLARE_QUEUE received"); +# trace.debug("Broker Event DECLARE_QUEUE received"); # self.conn.impl.declareQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: -# logging.debug("Broker Event DELETE_QUEUE received"); +# trace.debug("Broker Event DELETE_QUEUE received"); # self.conn.impl.deleteQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.BIND: -# logging.debug("Broker Event BIND received"); +# trace.debug("Broker Event BIND received"); # self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.UNBIND: -# logging.debug("Broker Event UNBIND received"); +# trace.debug("Broker Event UNBIND received"); # self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: -# logging.debug("Broker Event SETUP_COMPLETE received"); +# trace.debug("Broker Event SETUP_COMPLETE received"); # self.impl.startProtocol() # elif self._event.kind == qmfengine.BrokerEvent.STABLE: -# logging.debug("Broker Event STABLE received"); +# trace.debug("Broker Event STABLE received"); # self._cv.acquire() # try: # self._stable = True @@ -2387,7 +2390,7 @@ class Console(Thread): # valid = self.impl.getXmtMessage(self._xmtMessage) # while valid: # count += 1 -# logging.debug("Broker: sending msg on connection") +# trace.debug("Broker: sending msg on connection") # self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) # self.impl.popXmt() # valid = self.impl.getXmtMessage(self._xmtMessage) @@ -2405,14 +2408,14 @@ class Console(Thread): # def conn_event_connected(self): -# logging.debug("Broker: Connection event CONNECTED") +# trace.debug("Broker: Connection event CONNECTED") # self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) # self.impl.sessionOpened(self._session.handle) # self._do_events() # def conn_event_disconnected(self, error): -# logging.debug("Broker: Connection event DISCONNECTED") +# trace.debug("Broker: Connection event DISCONNECTED") # pass @@ -2421,14 +2424,14 @@ class Console(Thread): # def sess_event_session_closed(self, context, error): -# logging.debug("Broker: Session event CLOSED") +# trace.debug("Broker: Session event CLOSED") # self.impl.sessionClosed() # def sess_event_recv(self, context, message): -# logging.debug("Broker: Session event MSG_RECV") +# trace.debug("Broker: Session event MSG_RECV") # if not self._operational: -# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) +# log.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) # self.impl.handleRcvMessage(message) # self._do_events() @@ -2446,6 +2449,7 @@ class Console(Thread): if __name__ == '__main__': # temp test code + import logging from common import (qmfTypes, SchemaProperty) logging.getLogger().setLevel(logging.INFO) |