summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-05-09 18:40:13 +0000
committerRafael H. Schloming <rhs@apache.org>2008-05-09 18:40:13 +0000
commita91fe82ffdb32f0db050c8c071379281295e5ca8 (patch)
tree719c86b1b448ee9d7df40cd24ce1c4f9fe2b366b
parent485022ac7cd72b40cb4c99f2e27389d016a31371 (diff)
downloadqpid-python-a91fe82ffdb32f0db050c8c071379281295e5ca8.tar.gz
QPID-1045: always notify incoming message queues of session closure and provide API for notifying listeners of closure; also preserve connection close code and report in errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654907 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/connection.py31
-rw-r--r--qpid/python/qpid/delegates.py17
-rw-r--r--qpid/python/qpid/exceptions.py1
-rw-r--r--qpid/python/qpid/framer.py3
-rw-r--r--qpid/python/qpid/queue.py21
-rw-r--r--qpid/python/qpid/session.py44
-rw-r--r--qpid/python/tests/connection.py68
7 files changed, 138 insertions, 47 deletions
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index 4ed430249b..8d0e115458 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -19,8 +19,7 @@
import datatypes, session
from threading import Thread, Condition, RLock
-from util import wait
-from framer import Closed
+from util import wait, notify
from assembler import Assembler, Segment
from codec010 import StringCodec
from session import Session
@@ -39,11 +38,11 @@ class SessionBusy(Exception): pass
class ConnectionFailed(Exception): pass
-def client(*args):
- return delegates.Client(*args)
+def client(*args, **kwargs):
+ return delegates.Client(*args, **kwargs)
-def server(*args):
- return delegates.Server(*args)
+def server(*args, **kwargs):
+ return delegates.Server(*args, **kwargs)
class Connection(Assembler):
@@ -61,13 +60,14 @@ class Connection(Assembler):
self.condition = Condition()
self.opened = False
self.failed = False
+ self.close_code = (None, "connection aborted")
self.thread = Thread(target=self.run)
self.thread.setDaemon(True)
self.channel_max = 65535
- self.delegate = delegate(self, args)
+ self.delegate = delegate(self, **args)
def attach(self, name, ch, delegate, force=False):
self.lock.acquire()
@@ -101,6 +101,8 @@ class Connection(Assembler):
ssn = self.sessions.pop(name, None)
if ssn is not None:
ssn.channel = None
+ ssn.closed()
+ notify(ssn.condition)
return ssn
finally:
self.lock.release()
@@ -127,13 +129,23 @@ class Connection(Assembler):
finally:
self.lock.release()
+ def detach_all(self):
+ self.lock.acquire()
+ try:
+ for ssn in self.attached.values():
+ if self.close_code[0] != 200:
+ ssn.exceptions.append(self.close_code)
+ self.detach(ssn.name, ssn.channel)
+ finally:
+ self.lock.release()
+
def start(self, timeout=None):
self.delegate.start()
self.thread.start()
if not wait(self.condition, lambda: self.opened or self.failed, timeout):
raise Timeout()
- if (self.failed):
- raise ConnectionFailed()
+ if self.failed:
+ raise ConnectionFailed(*self.close_code)
def run(self):
# XXX: we don't really have a good way to exit this loop without
@@ -142,6 +154,7 @@ class Connection(Assembler):
try:
seg = self.read_segment()
except Closed:
+ self.detach_all()
break
self.delegate.received(seg)
diff --git a/qpid/python/qpid/delegates.py b/qpid/python/qpid/delegates.py
index cdff132219..6e6b55e36a 100644
--- a/qpid/python/qpid/delegates.py
+++ b/qpid/python/qpid/delegates.py
@@ -50,6 +50,7 @@ class Delegate:
ssn.received(seg)
def connection_close(self, ch, close):
+ self.connection.close_code = (close.reply_code, close.reply_text)
ch.connection_close_ok()
self.connection.sock.close()
if not self.connection.opened:
@@ -73,13 +74,11 @@ class Delegate:
notify(ch.session.condition)
def session_detach(self, ch, d):
- self.connection.detach(d.name, ch)
+ ssn = self.connection.detach(d.name, ch)
ch.session_detached(d.name)
def session_detached(self, ch, d):
- ssn = self.connection.detach(d.name, ch)
- if ssn is not None:
- notify(ch.session.condition)
+ self.connection.detach(d.name, ch)
def session_command_point(self, ch, cp):
ssn = ch.session
@@ -127,11 +126,11 @@ class Client(Delegate):
"version": "development",
"platform": os.name}
- def __init__(self, connection, args={}):
- Delegate.__init__(self, connection)
- self.username = args.get('username', 'guest')
- self.password = args.get('password', 'guest')
- self.mechanism = args.get('mechanism', 'PLAIN')
+ def __init__(self, connection, username="guest", password="guest", mechanism="PLAIN"):
+ Delegate.__init__(self, connection)
+ self.username = username
+ self.password = password
+ self.mechanism = mechanism
def start(self):
self.connection.write_header(self.spec.major, self.spec.minor)
diff --git a/qpid/python/qpid/exceptions.py b/qpid/python/qpid/exceptions.py
index 2136793d3b..7eaaf81ed4 100644
--- a/qpid/python/qpid/exceptions.py
+++ b/qpid/python/qpid/exceptions.py
@@ -17,4 +17,5 @@
# under the License.
#
+class Closed(Exception): pass
class Timeout(Exception): pass
diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py
index 78a29235cb..27ea3287f0 100644
--- a/qpid/python/qpid/framer.py
+++ b/qpid/python/qpid/framer.py
@@ -18,6 +18,7 @@
#
import struct, socket
+from exceptions import Closed
from packer import Packer
from threading import Lock
from logging import getLogger
@@ -66,8 +67,6 @@ class Frame:
self.channel,
self.payload)
-class Closed(Exception): pass
-
class FramingError(Exception): pass
class Framer(Packer):
diff --git a/qpid/python/qpid/queue.py b/qpid/python/qpid/queue.py
index ea8f00d091..a8a5c0d9ad 100644
--- a/qpid/python/qpid/queue.py
+++ b/qpid/python/qpid/queue.py
@@ -25,8 +25,7 @@ content of a queue can be notified if the queue is no longer in use.
from Queue import Queue as BaseQueue, Empty, Full
from threading import Thread
-
-class Closed(Exception): pass
+from exceptions import Closed
class Queue(BaseQueue):
@@ -37,6 +36,7 @@ class Queue(BaseQueue):
BaseQueue.__init__(self, *args, **kwargs)
self.error = None
self.listener = None
+ self.exc_listener = None
self.thread = None
def close(self, error = None):
@@ -53,15 +53,20 @@ class Queue(BaseQueue):
else:
return result
- def listen(self, listener):
+ def listen(self, listener, exc_listener = None):
+ if listener is None and exc_listener is not None:
+ raise ValueError("cannot set exception listener without setting listener")
+
self.listener = listener
- if listener == None:
- if self.thread != None:
+ self.exc_listener = exc_listener
+
+ if listener is None:
+ if self.thread is not None:
self.put(Queue.STOP)
self.thread.join()
self.thread = None
else:
- if self.thread == None:
+ if self.thread is None:
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -72,5 +77,7 @@ class Queue(BaseQueue):
o = self.get()
if o == Queue.STOP: break
self.listener(o)
- except Closed:
+ except Closed, e:
+ if self.exc_listener is not None:
+ self.exc_listener(e)
break
diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py
index f8ac98b96e..a1103e0428 100644
--- a/qpid/python/qpid/session.py
+++ b/qpid/python/qpid/session.py
@@ -52,7 +52,8 @@ class Session(Invoker):
self.timeout = timeout
self.channel = None
self.invoke_lock = Lock()
- self.closed = False
+ self._closing = False
+ self._closed = False
self.condition = Condition()
@@ -82,7 +83,9 @@ class Session(Invoker):
def error(self):
exc = self.exceptions[:]
- if len(exc) == 1:
+ if len(exc) == 0:
+ return None
+ elif len(exc) == 1:
return exc[0]
else:
return tuple(exc)
@@ -102,13 +105,31 @@ class Session(Invoker):
def close(self, timeout=None):
self.invoke_lock.acquire()
try:
- self.closed = True
+ self._closing = True
self.channel.session_detach(self.name)
finally:
self.invoke_lock.release()
if not wait(self.condition, lambda: self.channel is None, timeout):
raise Timeout()
+ def closed(self):
+ self.lock.acquire()
+ try:
+ if self._closed: return
+ self._closed = True
+
+ error = self.error()
+ for id in self.results:
+ f = self.results[id]
+ f.error(error)
+ self.results.clear()
+
+ for q in self._incoming.values():
+ q.close(error)
+ notify(self.condition)
+ finally:
+ self.lock.release()
+
def resolve_method(self, name):
cmd = self.spec.instructions.get(name)
if cmd is not None and cmd.track == self.spec["track.command"].value:
@@ -136,7 +157,7 @@ class Session(Invoker):
self.invoke_lock.release()
def do_invoke(self, type, args, kwargs):
- if self.closed:
+ if self._closing:
raise SessionClosed()
if self.channel == None:
@@ -311,20 +332,7 @@ class Delegate:
future.set(er.value)
def execution_exception(self, ex):
- self.session.lock.acquire()
- try:
- self.session.exceptions.append(ex)
- error = self.session.error()
- for id in self.session.results:
- f = self.session.results[id]
- f.error(error)
- self.session.results.clear()
-
- for q in self.session._incoming.values():
- q.close(error)
- notify(self.session.condition)
- finally:
- self.session.lock.release()
+ self.session.exceptions.append(ex)
class Client(Delegate):
diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py
index 6925480ed3..88620bc1c6 100644
--- a/qpid/python/tests/connection.py
+++ b/qpid/python/tests/connection.py
@@ -51,8 +51,16 @@ class TestSession(Delegate):
def queue_query(self, qq):
return qq._type.result.type.new((qq.queue,), {})
- def message_transfer(self, cmd, header, body):
- self.queue.put((cmd, header, body))
+ def message_transfer(self, cmd, headers, body):
+ if cmd.destination == "echo":
+ m = Message(body)
+ m.headers = headers
+ self.session.message_transfer(cmd.destination, cmd.accept_mode,
+ cmd.acquire_mode, m)
+ elif cmd.destination == "abort":
+ self.session.channel.connection.sock.close()
+ else:
+ self.queue.put((cmd, headers, body))
class ConnectionTest(TestCase):
@@ -134,3 +142,59 @@ class ConnectionTest(TestCase):
qq = ssn.queue_query("asdf")
assert qq.queue == "asdf"
c.close(5)
+
+ def testCloseGet(self):
+ c = Connection(connect("0.0.0.0", PORT), self.spec)
+ c.start(10)
+ ssn = c.session("test", timeout=10)
+ echos = ssn.incoming("echo")
+
+ for i in range(10):
+ ssn.message_transfer("echo", message=Message("test%d" % i))
+
+ ssn.auto_sync=False
+ ssn.message_transfer("abort")
+
+ for i in range(10):
+ m = echos.get(timeout=10)
+ assert m.body == "test%d" % i
+
+ try:
+ m = echos.get(timeout=10)
+ assert False
+ except Closed, e:
+ pass
+
+ def testCloseListen(self):
+ c = Connection(connect("0.0.0.0", PORT), self.spec)
+ c.start(10)
+ ssn = c.session("test", timeout=10)
+ echos = ssn.incoming("echo")
+
+ messages = []
+ exceptions = []
+ condition = Condition()
+ def listener(m): messages.append(m)
+ def exc_listener(e):
+ exceptions.append(e)
+ condition.acquire()
+ condition.notify()
+ condition.release()
+
+ echos.listen(listener, exc_listener)
+
+ for i in range(10):
+ ssn.message_transfer("echo", message=Message("test%d" % i))
+
+ ssn.auto_sync=False
+ ssn.message_transfer("abort")
+
+ condition.acquire()
+ condition.wait(10)
+ condition.release()
+
+ for i in range(10):
+ m = messages.pop(0)
+ assert m.body == "test%d" % i
+
+ assert len(exceptions) == 1