diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-08-21 22:21:49 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-08-21 22:21:49 +0000 |
commit | 5f1d73e5cc7bd32fb0c3bfe5a296138cc7ffa30b (patch) | |
tree | 724cf3cf0711bb3c12c89fd0deea8a2446898582 | |
parent | 2ad1a5da3732b280d9b780313969bef4da05f113 (diff) | |
download | qpid-python-5f1d73e5cc7bd32fb0c3bfe5a296138cc7ffa30b.tar.gz |
fixed some more channel attribute errors; eliminated most uses of catchup in favor of waiting on semantically meaningful predicates; fixed Serial.__cmp__ to check the type of the other object
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@806734 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/datatypes.py | 2 | ||||
-rw-r--r-- | python/qpid/messaging.py | 58 | ||||
-rw-r--r-- | python/qpid/session.py | 7 |
3 files changed, 42 insertions, 25 deletions
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py index 444856776e..f832ddae34 100644 --- a/python/qpid/datatypes.py +++ b/python/qpid/datatypes.py @@ -132,7 +132,7 @@ class Serial: return hash(self.value) def __cmp__(self, other): - if other is None: + if other.__class__ not in (int, long, Serial): return 1 other = serial(other) diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index b66f681dcd..8a7a27d275 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -150,6 +150,7 @@ class Connection(Lockable): self.session_counter = 0 self.sessions = {} self.reconnect = False + self._connected = False self._lock = RLock() self._condition = Condition(self._lock) self._modcount = Serial(0) @@ -170,6 +171,11 @@ class Connection(Lockable): if self.error: raise exc(*self.error) + def ewait(self, predicate, timeout=None, exc=ConnectionError): + result = self.wait(lambda: self.error or predicate(), timeout) + self.check_error(exc) + return result + @synchronized def session(self, name=None, transactional=False): """ @@ -208,7 +214,7 @@ class Connection(Lockable): """ self._connected = True self.wakeup() - self.catchup(ConnectError) + self.ewait(lambda: self._driver._connected, exc=ConnectError) @synchronized def disconnect(self): @@ -217,7 +223,7 @@ class Connection(Lockable): """ self._connected = False self.wakeup() - self.catchup() + self.ewait(lambda: not self._driver._connected) @synchronized def connected(self): @@ -345,6 +351,9 @@ class Session(Lockable): def check_error(self, exc=SessionError): self.connection.check_error(exc) + def ewait(self, predicate, timeout=None, exc=SessionError): + return self.connection.ewait(predicate, timeout, exc) + @synchronized def sender(self, target): """ @@ -445,8 +454,7 @@ class Session(Lockable): raise NontransactionalSession() self.committing = True self.wakeup() - self.catchup() - assert not self.committing + self.ewait(lambda: not self.committing) if self.aborted: raise TransactionAborted() assert self.committed @@ -461,8 +469,8 @@ class Session(Lockable): raise NontransactionalSession() self.aborting = True self.wakeup() - self.catchup() - assert not self.aborting and self.aborted + self.ewait(lambda: not self.aborting) + assert self.aborted @synchronized def start(self): @@ -560,6 +568,9 @@ class Sender(Lockable): def check_error(self, exc=SendError): self.session.check_error(exc) + def ewait(self, predicate, timeout=None, exc=SendError): + return self.session.ewait(predicate, timeout, exc) + @synchronized def send(self, object): """ @@ -585,8 +596,7 @@ class Sender(Lockable): self.session.outgoing.append(message) self.wakeup() - self.catchup() - assert message not in self.session.outgoing + self.ewait(lambda: message not in self.session.outgoing) @synchronized def close(self): @@ -626,11 +636,12 @@ class Receiver(Lockable): self.started = started self.capacity = UNLIMITED self.granted = Serial(0) - self.drain = False + self.draining = False self.impending = Serial(0) self.received = Serial(0) self.returned = Serial(0) + self.closing = False self.closed = False self.listener = None self._lock = self.session._lock @@ -645,6 +656,9 @@ class Receiver(Lockable): def check_error(self, exc=ReceiveError): self.session.check_error(exc) + def ewait(self, predicate, timeout=None, exc=ReceiveError): + return self.session.ewait(predicate, timeout, exc) + @synchronized def pending(self): return self.received - self.returned @@ -683,15 +697,13 @@ class Receiver(Lockable): if self._capacity() == 0: self.granted = self.returned + 1 self.wakeup() - self.catchup() - assert self.impending == self.granted + self.ewait(lambda: self.impending == self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self.drain = True + self.draining = True self.wakeup() - self.catchup() + self.ewait(lambda: not self.draining) assert self.granted == self.received - self.drain = False if self.capacity is not UNLIMITED: self.granted += self._capacity() self.wakeup() @@ -723,18 +735,17 @@ class Receiver(Lockable): self.granted = self.received self.started = False self.wakeup() - self.catchup() - assert self.granted == self.received + self.ewait(lambda: self.impending == self.received) @synchronized def close(self): """ Close the receiver. """ - self.closed = True + self.closing = True self.wakeup() try: - self.catchup() + self.ewait(lambda: self.closed) finally: self.session.receivers.remove(self) @@ -837,6 +848,7 @@ class Driver(Lockable): self._wakeup_cond = Condition() self._socket = None self._conn = None + self._connected = False self._attachments = {} self._modcount = self.connection._modcount self.thread = Thread(target=self.run) @@ -909,6 +921,7 @@ class Driver(Lockable): self._conn = connection.Connection(self._socket) try: self._conn.start(timeout=10) + self._connected = True except connection.VersionError, e: raise ConnectError(e) except Timeout: @@ -921,13 +934,13 @@ class Driver(Lockable): def reset(self): self._conn = None + self._connected = False self._attachments.clear() for ssn in self.connection.sessions.values(): for m in ssn.acked + ssn.unacked + ssn.incoming: m._transfer_id = None for rcv in ssn.receivers: rcv.impending = rcv.received - rcv.returned = rcv.received def connected(self): return self._conn is not None @@ -1006,11 +1019,12 @@ class Driver(Lockable): # XXX: need to kill syncs _ssn.sync() - if rcv.closed: + if rcv.closing: _ssn.message_cancel(rcv.destination, sync=True) # XXX: need to kill syncs _ssn.sync() del self._attachments[rcv] + rcv.closed = True def process(self, ssn): if ssn.closing: return @@ -1116,12 +1130,13 @@ class Driver(Lockable): self.grant(rcv) - if rcv.drain: + if rcv.draining: _ssn.message_flush(rcv.destination, sync=True) # XXX: really need to make this async so that we don't give up the lock _ssn.sync() rcv.granted = rcv.received rcv.impending = rcv.received + rcv.draining = False def send(self, snd, msg): _ssn = self._attachments[snd.session] @@ -1201,4 +1216,3 @@ class Driver(Lockable): __all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message", "Empty", "timestamp", "uuid4"] -\ diff --git a/python/qpid/session.py b/python/qpid/session.py index 071d4dc817..2f1bd81bd4 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -244,13 +244,16 @@ class Sender: self._completed = RangedSet() def send(self, cmd): + ch = self.session.channel + if ch is None: + raise SessionDetached() cmd.id = self.next_id self.next_id += 1 if self.session.send_id: self.session.send_id = False - self.session.channel.session_command_point(cmd.id, 0) + ch.session_command_point(cmd.id, 0) self.commands.append(cmd) - self.session.channel.connection.write_op(cmd) + ch.connection.write_op(cmd) def completed(self, commands): idx = 0 |