summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-10-23 18:11:42 +0000
committerTed Ross <tross@apache.org>2009-10-23 18:11:42 +0000
commitc677d6ffec635f28e3e48255cfca3508a13b40ae (patch)
treed853652d1f2b631c71de8036f6de954dc34b201c
parent817ea4b82aad3b7f48994b40708a0b9a87b2be81 (diff)
downloadqpid-python-c677d6ffec635f28e3e48255cfca3508a13b40ae.tar.gz
QPID-2156 - Add thread shutdown to python QMF bindings, additional logging, native console test.
Committed patch from Ken Giusti. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@829161 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/bindings/qmf/python/qmf.py140
1 files changed, 99 insertions, 41 deletions
diff --git a/cpp/bindings/qmf/python/qmf.py b/cpp/bindings/qmf/python/qmf.py
index 2064eb85cf..871a25e207 100644
--- a/cpp/bindings/qmf/python/qmf.py
+++ b/cpp/bindings/qmf/python/qmf.py
@@ -19,6 +19,7 @@
import sys
import socket
import os
+import logging
from threading import Thread
from threading import RLock
from threading import Condition
@@ -109,8 +110,19 @@ class Connection(Thread):
self._conn_handlers_to_delete = []
self._conn_handlers = []
self._connected = False
+ self._operational = True
self.start()
-
+
+
+ def destroy(self, timeout=None):
+ logging.debug("Destroying Connection...")
+ self._operational = False
+ self.kick()
+ self.join(timeout)
+ logging.debug("... Conn Destroyed!" )
+ if self.isAlive():
+ logging.error("Error: Connection thread '%s' is hung..." % self.getName())
+
def connected(self):
return self._connected
@@ -145,8 +157,8 @@ class Connection(Thread):
del_handlers = []
bt_count = 0
- while True:
- # print "Waiting for socket data"
+ while self._operational:
+ logging.debug("Connection thread waiting for socket data...")
self._sock.recv(1)
self._lock.acquire()
@@ -173,24 +185,31 @@ class Connection(Thread):
while valid:
try:
if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED:
+ logging.debug("Connection thread: CONNECTED event received.")
self._connected = True
for h in self._conn_handlers:
h.conn_event_connected()
elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED:
+ logging.debug("Connection thread: DISCONNECTED event received.")
self._connected = False
for h in self._conn_handlers:
h.conn_event_disconnected(eventImpl.errorText)
elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED:
+ logging.debug("Connection thread: SESSION_CLOSED event received.")
eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText)
elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV:
+ logging.debug("Connection thread: RECV event received.")
eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message)
+ else:
+ logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind))
except:
import traceback
- print "Event Exception:", sys.exc_info()
+ logging.error( "Exception occurred during Connection event processing:" )
+ logging.error( str(sys.exc_info()) )
if bt_count < 2:
traceback.print_exc()
traceback.print_stack()
@@ -202,6 +221,8 @@ class Connection(Thread):
for h in self._conn_handlers:
h.conn_event_visit()
+ logging.debug("Shutting down Connection thread")
+
class Session:
@@ -296,7 +317,7 @@ class QmfObject(object):
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported type for get_attr?", val.getType()
+ logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) )
return None
@@ -329,7 +350,7 @@ class QmfObject(object):
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported type for get_attr?", val.getType()
+ logging.error("Unsupported type for get_attr? '%s'" % str(val.getType()))
return None
@@ -626,7 +647,7 @@ class Arguments(object):
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported Type for Get?", val.getType()
+ logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
return None
@@ -663,7 +684,7 @@ class Arguments(object):
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported Type for Set?", val.getType()
+ logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
return None
@@ -960,8 +981,19 @@ class Console(Thread):
self._sync_result = None
self._select = {}
self._cb_cond = Condition()
+ self._operational = True
self.start()
+
+ def destroy(self, timeout=None):
+ logging.debug("Destroying Console...")
+ self._operational = False
+ self.start_console_events() # wake thread up
+ self.join(timeout)
+ logging.debug("... Console Destroyed!")
+ if self.isAlive():
+ logging.error( "Console thread '%s' is hung..." % self.getName() )
+
def add_connection(self, conn):
broker = Broker(self, conn)
@@ -974,12 +1006,15 @@ class Console(Thread):
def del_connection(self, broker):
+ logging.debug("shutting down broker...")
broker.shutdown()
+ logging.debug("...broker down.")
self._cv.acquire()
try:
self._broker_list.remove(broker)
finally:
self._cv.release()
+ logging.debug("del_connection() finished")
def packages(self):
@@ -1129,14 +1164,15 @@ class Console(Thread):
def run(self):
- while True:
- self._cb_cond.acquire()
- try:
- self._cb_cond.wait(1)
- while self.do_console_events():
- pass
- finally:
- self._cb_cond.release()
+ while self._operational:
+ self._cb_cond.acquire()
+ try:
+ self._cb_cond.wait(1)
+ while self._do_console_events():
+ pass
+ finally:
+ self._cb_cond.release()
+ logging.debug("Shutting down Console thread")
def start_console_events(self):
@@ -1147,39 +1183,47 @@ class Console(Thread):
self._cb_cond.release()
- def do_console_events(self):
+ def _do_console_events(self):
'''
- Called by Broker proxy to poll for Console events. Passes the events
- onto the ConsoleHandler associated with this Console.
+ Called by the Console thread to poll for events. Passes the events
+ onto the ConsoleHandler associated with this Console. Is called
+ periodically, but can also be kicked by Console.start_console_events().
'''
count = 0
valid = self.impl.getEvent(self._event)
while valid:
count += 1
- # print "Console Event:", self._event.kind
if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
+ logging.debug("Console Event AGENT_ADDED received")
if self._handler:
self._handler.agent_added(AgentProxy(self._event.agent, None))
elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
+ logging.debug("Console Event AGENT_DELETED received")
if self._handler:
self._handler.agent_deleted(AgentProxy(self._event.agent, None))
elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
+ logging.debug("Console Event NEW_PACKAGE received")
if self._handler:
self._handler.new_package(self._event.name)
elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
+ logging.debug("Console Event NEW_CLASS received")
if self._handler:
self._handler.new_class(SchemaClassKey(self._event.classKey))
elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
+ logging.debug("Console Event OBJECT_UPDATE received")
if self._handler:
self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
self._event.hasProps, self._event.hasStats)
elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
- pass
+ logging.debug("Console Event EVENT_RECEIVED received")
elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
+ logging.debug("Console Event AGENT_HEARTBEAT received")
if self._handler:
self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
- pass
+ logging.debug("Console Event METHOD_RESPONSE received")
+ else:
+ logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
self.impl.popEvent()
valid = self.impl.getEvent(self._event)
@@ -1210,7 +1254,6 @@ class AgentProxy:
class Broker(ConnectionHandler):
# attr_reader :impl :conn, :console, :broker_bank
def __init__(self, console, conn):
- ConnectionHandler.__init__(self)
self.broker_bank = 1
self.console = console
self.conn = conn
@@ -1226,9 +1269,17 @@ class Broker(ConnectionHandler):
def shutdown(self):
+ logging.debug("broker.shutdown() called.")
self.console.impl.delConnection(self.impl)
self.conn.del_conn_handler(self)
+ if self._session:
+ self.impl.sessionClosed()
+ logging.debug("broker.shutdown() sessionClosed done.")
+ self._session.destroy()
+ logging.debug("broker.shutdown() session destroy done.")
+ self._session = None
self._operational = False
+ logging.debug("broker.shutdown() done.")
def wait_for_stable(self, timeout = None):
@@ -1252,26 +1303,31 @@ class Broker(ConnectionHandler):
self.conn.kick()
- def do_broker_events(self):
+ def _do_broker_events(self):
count = 0
valid = self.impl.getEvent(self._event)
while valid:
count += 1
- # print "Broker Event: ", self._event.kind
if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
- pass
+ logging.debug("Broker Event BROKER_INFO received");
elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
+ logging.debug("Broker Event DECLARE_QUEUE received");
self.conn.impl.declareQueue(self._session.handle, self._event.name)
elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
+ logging.debug("Broker Event DELETE_QUEUE received");
self.conn.impl.deleteQueue(self._session.handle, self._event.name)
elif self._event.kind == qmfengine.BrokerEvent.BIND:
+ logging.debug("Broker Event BIND received");
self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
+ logging.debug("Broker Event UNBIND received");
self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
+ logging.debug("Broker Event SETUP_COMPLETE received");
self.impl.startProtocol()
elif self._event.kind == qmfengine.BrokerEvent.STABLE:
- self_.cv.acquire()
+ logging.debug("Broker Event STABLE received");
+ self._cv.acquire()
try:
self._stable = True
self._cv.notify()
@@ -1292,11 +1348,12 @@ class Broker(ConnectionHandler):
return count
- def do_broker_messages(self):
+ def _do_broker_messages(self):
count = 0
valid = self.impl.getXmtMessage(self._xmtMessage)
while valid:
count += 1
+ logging.debug("Broker: sending msg on connection")
self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
self.impl.popXmt()
valid = self.impl.getXmtMessage(self._xmtMessage)
@@ -1304,41 +1361,42 @@ class Broker(ConnectionHandler):
return count
- def do_events(self):
+ def _do_events(self):
while True:
self.console.start_console_events()
- bcnt = do_broker_events()
- mcnt = do_broker_messages()
+ bcnt = self._do_broker_events()
+ mcnt = self._do_broker_messages()
if bcnt == 0 and mcnt == 0:
break;
def conn_event_connected(self):
- print "Console Connection Established..."
+ logging.debug("Broker: Connection event CONNECTED")
self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
self.impl.sessionOpened(self._session.handle)
- self.do_events()
+ self._do_events()
def conn_event_disconnected(self, error):
- print "Console Connection Lost"
+ logging.debug("Broker: Connection event DISCONNECTED")
pass
def conn_event_visit(self):
- self.do_events()
+ self._do_events()
def sess_event_session_closed(self, context, error):
- print "Console Session Lost"
+ logging.debug("Broker: Session event CLOSED")
self.impl.sessionClosed()
def sess_event_recv(self, context, message):
+ logging.debug("Broker: Session event MSG_RECV")
if not self._operational:
- print "Unexpected RECV Event"
+ logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
self.impl.handleRcvMessage(message)
- self.do_events()
+ self._do_events()
@@ -1454,7 +1512,7 @@ class Agent(ConnectionHandler):
def conn_event_connected(self):
- print "Agent Connection Established..."
+ logging.debug("Agent Connection Established...")
self._session = Session(self._conn,
"qmfa-%s.%d" % (socket.gethostname(), os.getpid()),
self)
@@ -1463,7 +1521,7 @@ class Agent(ConnectionHandler):
def conn_event_disconnected(self, error):
- print "Agent Connection Lost"
+ logging.debug("Agent Connection Lost")
pass
@@ -1472,7 +1530,7 @@ class Agent(ConnectionHandler):
def sess_event_session_closed(self, context, error):
- print "Agent Session Lost"
+ logging.debug("Agent Session Lost")
pass