diff options
author | Ted Ross <tross@apache.org> | 2009-10-23 18:11:42 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-10-23 18:11:42 +0000 |
commit | c677d6ffec635f28e3e48255cfca3508a13b40ae (patch) | |
tree | d853652d1f2b631c71de8036f6de954dc34b201c | |
parent | 817ea4b82aad3b7f48994b40708a0b9a87b2be81 (diff) | |
download | qpid-python-c677d6ffec635f28e3e48255cfca3508a13b40ae.tar.gz |
QPID-2156 - Add thread shutdown to python QMF bindings, additional logging, native console test.
Committed patch from Ken Giusti.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@829161 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/bindings/qmf/python/qmf.py | 140 |
1 files changed, 99 insertions, 41 deletions
diff --git a/cpp/bindings/qmf/python/qmf.py b/cpp/bindings/qmf/python/qmf.py index 2064eb85cf..871a25e207 100644 --- a/cpp/bindings/qmf/python/qmf.py +++ b/cpp/bindings/qmf/python/qmf.py @@ -19,6 +19,7 @@ import sys import socket import os +import logging from threading import Thread from threading import RLock from threading import Condition @@ -109,8 +110,19 @@ class Connection(Thread): self._conn_handlers_to_delete = [] self._conn_handlers = [] self._connected = False + self._operational = True self.start() - + + + def destroy(self, timeout=None): + logging.debug("Destroying Connection...") + self._operational = False + self.kick() + self.join(timeout) + logging.debug("... Conn Destroyed!" ) + if self.isAlive(): + logging.error("Error: Connection thread '%s' is hung..." % self.getName()) + def connected(self): return self._connected @@ -145,8 +157,8 @@ class Connection(Thread): del_handlers = [] bt_count = 0 - while True: - # print "Waiting for socket data" + while self._operational: + logging.debug("Connection thread waiting for socket data...") self._sock.recv(1) self._lock.acquire() @@ -173,24 +185,31 @@ class Connection(Thread): while valid: try: if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED: + logging.debug("Connection thread: CONNECTED event received.") self._connected = True for h in self._conn_handlers: h.conn_event_connected() elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED: + logging.debug("Connection thread: DISCONNECTED event received.") self._connected = False for h in self._conn_handlers: h.conn_event_disconnected(eventImpl.errorText) elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED: + logging.debug("Connection thread: SESSION_CLOSED event received.") eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText) elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV: + logging.debug("Connection thread: RECV event received.") eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message) + else: + logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind)) except: import traceback - print "Event Exception:", sys.exc_info() + logging.error( "Exception occurred during Connection event processing:" ) + logging.error( str(sys.exc_info()) ) if bt_count < 2: traceback.print_exc() traceback.print_stack() @@ -202,6 +221,8 @@ class Connection(Thread): for h in self._conn_handlers: h.conn_event_visit() + logging.debug("Shutting down Connection thread") + class Session: @@ -296,7 +317,7 @@ class QmfObject(object): # when TYPE_OBJECT # when TYPE_LIST # when TYPE_ARRAY - print "Unsupported type for get_attr?", val.getType() + logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) ) return None @@ -329,7 +350,7 @@ class QmfObject(object): # when TYPE_OBJECT # when TYPE_LIST # when TYPE_ARRAY - print "Unsupported type for get_attr?", val.getType() + logging.error("Unsupported type for get_attr? '%s'" % str(val.getType())) return None @@ -626,7 +647,7 @@ class Arguments(object): # when TYPE_OBJECT # when TYPE_LIST # when TYPE_ARRAY - print "Unsupported Type for Get?", val.getType() + logging.error( "Unsupported Type for Get? '%s'" % str(val.getType())) return None @@ -663,7 +684,7 @@ class Arguments(object): # when TYPE_OBJECT # when TYPE_LIST # when TYPE_ARRAY - print "Unsupported Type for Set?", val.getType() + logging.error("Unsupported Type for Set? '%s'" % str(val.getType())) return None @@ -960,8 +981,19 @@ class Console(Thread): self._sync_result = None self._select = {} self._cb_cond = Condition() + self._operational = True self.start() + + def destroy(self, timeout=None): + logging.debug("Destroying Console...") + self._operational = False + self.start_console_events() # wake thread up + self.join(timeout) + logging.debug("... Console Destroyed!") + if self.isAlive(): + logging.error( "Console thread '%s' is hung..." % self.getName() ) + def add_connection(self, conn): broker = Broker(self, conn) @@ -974,12 +1006,15 @@ class Console(Thread): def del_connection(self, broker): + logging.debug("shutting down broker...") broker.shutdown() + logging.debug("...broker down.") self._cv.acquire() try: self._broker_list.remove(broker) finally: self._cv.release() + logging.debug("del_connection() finished") def packages(self): @@ -1129,14 +1164,15 @@ class Console(Thread): def run(self): - while True: - self._cb_cond.acquire() - try: - self._cb_cond.wait(1) - while self.do_console_events(): - pass - finally: - self._cb_cond.release() + while self._operational: + self._cb_cond.acquire() + try: + self._cb_cond.wait(1) + while self._do_console_events(): + pass + finally: + self._cb_cond.release() + logging.debug("Shutting down Console thread") def start_console_events(self): @@ -1147,39 +1183,47 @@ class Console(Thread): self._cb_cond.release() - def do_console_events(self): + def _do_console_events(self): ''' - Called by Broker proxy to poll for Console events. Passes the events - onto the ConsoleHandler associated with this Console. + Called by the Console thread to poll for events. Passes the events + onto the ConsoleHandler associated with this Console. Is called + periodically, but can also be kicked by Console.start_console_events(). ''' count = 0 valid = self.impl.getEvent(self._event) while valid: count += 1 - # print "Console Event:", self._event.kind if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: + logging.debug("Console Event AGENT_ADDED received") if self._handler: self._handler.agent_added(AgentProxy(self._event.agent, None)) elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: + logging.debug("Console Event AGENT_DELETED received") if self._handler: self._handler.agent_deleted(AgentProxy(self._event.agent, None)) elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: + logging.debug("Console Event NEW_PACKAGE received") if self._handler: self._handler.new_package(self._event.name) elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: + logging.debug("Console Event NEW_CLASS received") if self._handler: self._handler.new_class(SchemaClassKey(self._event.classKey)) elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: + logging.debug("Console Event OBJECT_UPDATE received") if self._handler: self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), self._event.hasProps, self._event.hasStats) elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: - pass + logging.debug("Console Event EVENT_RECEIVED received") elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: + logging.debug("Console Event AGENT_HEARTBEAT received") if self._handler: self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: - pass + logging.debug("Console Event METHOD_RESPONSE received") + else: + logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) self.impl.popEvent() valid = self.impl.getEvent(self._event) @@ -1210,7 +1254,6 @@ class AgentProxy: class Broker(ConnectionHandler): # attr_reader :impl :conn, :console, :broker_bank def __init__(self, console, conn): - ConnectionHandler.__init__(self) self.broker_bank = 1 self.console = console self.conn = conn @@ -1226,9 +1269,17 @@ class Broker(ConnectionHandler): def shutdown(self): + logging.debug("broker.shutdown() called.") self.console.impl.delConnection(self.impl) self.conn.del_conn_handler(self) + if self._session: + self.impl.sessionClosed() + logging.debug("broker.shutdown() sessionClosed done.") + self._session.destroy() + logging.debug("broker.shutdown() session destroy done.") + self._session = None self._operational = False + logging.debug("broker.shutdown() done.") def wait_for_stable(self, timeout = None): @@ -1252,26 +1303,31 @@ class Broker(ConnectionHandler): self.conn.kick() - def do_broker_events(self): + def _do_broker_events(self): count = 0 valid = self.impl.getEvent(self._event) while valid: count += 1 - # print "Broker Event: ", self._event.kind if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: - pass + logging.debug("Broker Event BROKER_INFO received"); elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: + logging.debug("Broker Event DECLARE_QUEUE received"); self.conn.impl.declareQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: + logging.debug("Broker Event DELETE_QUEUE received"); self.conn.impl.deleteQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.BrokerEvent.BIND: + logging.debug("Broker Event BIND received"); self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.BrokerEvent.UNBIND: + logging.debug("Broker Event UNBIND received"); self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: + logging.debug("Broker Event SETUP_COMPLETE received"); self.impl.startProtocol() elif self._event.kind == qmfengine.BrokerEvent.STABLE: - self_.cv.acquire() + logging.debug("Broker Event STABLE received"); + self._cv.acquire() try: self._stable = True self._cv.notify() @@ -1292,11 +1348,12 @@ class Broker(ConnectionHandler): return count - def do_broker_messages(self): + def _do_broker_messages(self): count = 0 valid = self.impl.getXmtMessage(self._xmtMessage) while valid: count += 1 + logging.debug("Broker: sending msg on connection") self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) self.impl.popXmt() valid = self.impl.getXmtMessage(self._xmtMessage) @@ -1304,41 +1361,42 @@ class Broker(ConnectionHandler): return count - def do_events(self): + def _do_events(self): while True: self.console.start_console_events() - bcnt = do_broker_events() - mcnt = do_broker_messages() + bcnt = self._do_broker_events() + mcnt = self._do_broker_messages() if bcnt == 0 and mcnt == 0: break; def conn_event_connected(self): - print "Console Connection Established..." + logging.debug("Broker: Connection event CONNECTED") self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) self.impl.sessionOpened(self._session.handle) - self.do_events() + self._do_events() def conn_event_disconnected(self, error): - print "Console Connection Lost" + logging.debug("Broker: Connection event DISCONNECTED") pass def conn_event_visit(self): - self.do_events() + self._do_events() def sess_event_session_closed(self, context, error): - print "Console Session Lost" + logging.debug("Broker: Session event CLOSED") self.impl.sessionClosed() def sess_event_recv(self, context, message): + logging.debug("Broker: Session event MSG_RECV") if not self._operational: - print "Unexpected RECV Event" + logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) self.impl.handleRcvMessage(message) - self.do_events() + self._do_events() @@ -1454,7 +1512,7 @@ class Agent(ConnectionHandler): def conn_event_connected(self): - print "Agent Connection Established..." + logging.debug("Agent Connection Established...") self._session = Session(self._conn, "qmfa-%s.%d" % (socket.gethostname(), os.getpid()), self) @@ -1463,7 +1521,7 @@ class Agent(ConnectionHandler): def conn_event_disconnected(self, error): - print "Agent Connection Lost" + logging.debug("Agent Connection Lost") pass @@ -1472,7 +1530,7 @@ class Agent(ConnectionHandler): def sess_event_session_closed(self, context, error): - print "Agent Session Lost" + logging.debug("Agent Session Lost") pass |