summaryrefslogtreecommitdiff
path: root/python/qpid/connection010.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/connection010.py')
-rw-r--r--python/qpid/connection010.py30
1 files changed, 18 insertions, 12 deletions
diff --git a/python/qpid/connection010.py b/python/qpid/connection010.py
index b25efd37a8..b17ceea534 100644
--- a/python/qpid/connection010.py
+++ b/python/qpid/connection010.py
@@ -18,7 +18,8 @@
#
import datatypes, session
-from threading import Thread, Event, RLock
+from threading import Thread, Condition, RLock
+from util import wait
from framer import Closed
from assembler import Assembler, Segment
from codec010 import StringCodec
@@ -47,16 +48,21 @@ class Connection(Assembler):
Assembler.__init__(self, sock)
self.spec = spec
self.track = self.spec["track"]
- self.delegate = delegate(self)
+
+ self.lock = RLock()
self.attached = {}
self.sessions = {}
- self.lock = RLock()
+
+ self.condition = Condition()
+ self.opened = False
+
self.thread = Thread(target=self.run)
self.thread.setDaemon(True)
- self.opened = Event()
- self.closed = Event()
+
self.channel_max = 65535
+ self.delegate = delegate(self)
+
def attach(self, name, ch, delegate, force=False):
self.lock.acquire()
try:
@@ -104,12 +110,13 @@ class Connection(Assembler):
def session(self, name, timeout=None, delegate=session.client):
self.lock.acquire()
try:
- ssn = self.attach(name, Channel(self, self.__channel()), delegate)
+ ch = Channel(self, self.__channel())
+ ssn = self.attach(name, ch, delegate)
ssn.channel.session_attach(name)
- ssn.opened.wait(timeout)
- if ssn.opened.isSet():
+ if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
return ssn
else:
+ self.detach(name, ch)
raise Timeout()
finally:
self.lock.release()
@@ -117,8 +124,7 @@ class Connection(Assembler):
def start(self, timeout=None):
self.delegate.start()
self.thread.start()
- self.opened.wait(timeout=timeout)
- if not self.opened.isSet():
+ if not wait(self.condition, lambda: self.opened, timeout):
raise Timeout()
def run(self):
@@ -132,9 +138,9 @@ class Connection(Assembler):
self.delegate.received(seg)
def close(self, timeout=None):
+ if not self.opened: return
Channel(self, 0).connection_close()
- self.closed.wait(timeout=timeout)
- if not self.closed.isSet():
+ if not wait(self.condition, lambda: not self.opened, timeout):
raise Timeout()
self.thread.join(timeout=timeout)