diff options
Diffstat (limited to 'python/qpid/connection010.py')
-rw-r--r-- | python/qpid/connection010.py | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/python/qpid/connection010.py b/python/qpid/connection010.py index b25efd37a8..b17ceea534 100644 --- a/python/qpid/connection010.py +++ b/python/qpid/connection010.py @@ -18,7 +18,8 @@ # import datatypes, session -from threading import Thread, Event, RLock +from threading import Thread, Condition, RLock +from util import wait from framer import Closed from assembler import Assembler, Segment from codec010 import StringCodec @@ -47,16 +48,21 @@ class Connection(Assembler): Assembler.__init__(self, sock) self.spec = spec self.track = self.spec["track"] - self.delegate = delegate(self) + + self.lock = RLock() self.attached = {} self.sessions = {} - self.lock = RLock() + + self.condition = Condition() + self.opened = False + self.thread = Thread(target=self.run) self.thread.setDaemon(True) - self.opened = Event() - self.closed = Event() + self.channel_max = 65535 + self.delegate = delegate(self) + def attach(self, name, ch, delegate, force=False): self.lock.acquire() try: @@ -104,12 +110,13 @@ class Connection(Assembler): def session(self, name, timeout=None, delegate=session.client): self.lock.acquire() try: - ssn = self.attach(name, Channel(self, self.__channel()), delegate) + ch = Channel(self, self.__channel()) + ssn = self.attach(name, ch, delegate) ssn.channel.session_attach(name) - ssn.opened.wait(timeout) - if ssn.opened.isSet(): + if wait(ssn.condition, lambda: ssn.channel is not None, timeout): return ssn else: + self.detach(name, ch) raise Timeout() finally: self.lock.release() @@ -117,8 +124,7 @@ class Connection(Assembler): def start(self, timeout=None): self.delegate.start() self.thread.start() - self.opened.wait(timeout=timeout) - if not self.opened.isSet(): + if not wait(self.condition, lambda: self.opened, timeout): raise Timeout() def run(self): @@ -132,9 +138,9 @@ class Connection(Assembler): self.delegate.received(seg) def close(self, timeout=None): + if not self.opened: return Channel(self, 0).connection_close() - self.closed.wait(timeout=timeout) - if not self.closed.isSet(): + if not wait(self.condition, lambda: not self.opened, timeout): raise Timeout() self.thread.join(timeout=timeout) |