summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-08-21 22:21:49 +0000
committerRafael H. Schloming <rhs@apache.org>2009-08-21 22:21:49 +0000
commit5f1d73e5cc7bd32fb0c3bfe5a296138cc7ffa30b (patch)
tree724cf3cf0711bb3c12c89fd0deea8a2446898582
parent2ad1a5da3732b280d9b780313969bef4da05f113 (diff)
downloadqpid-python-5f1d73e5cc7bd32fb0c3bfe5a296138cc7ffa30b.tar.gz
fixed some more channel attribute errors; eliminated most uses of catchup in favor of waiting on semantically meaningful predicates; fixed Serial.__cmp__ to check the type of the other object
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@806734 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--python/qpid/datatypes.py2
-rw-r--r--python/qpid/messaging.py58
-rw-r--r--python/qpid/session.py7
3 files changed, 42 insertions, 25 deletions
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index 444856776e..f832ddae34 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -132,7 +132,7 @@ class Serial:
return hash(self.value)
def __cmp__(self, other):
- if other is None:
+ if other.__class__ not in (int, long, Serial):
return 1
other = serial(other)
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index b66f681dcd..8a7a27d275 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -150,6 +150,7 @@ class Connection(Lockable):
self.session_counter = 0
self.sessions = {}
self.reconnect = False
+ self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
self._modcount = Serial(0)
@@ -170,6 +171,11 @@ class Connection(Lockable):
if self.error:
raise exc(*self.error)
+ def ewait(self, predicate, timeout=None, exc=ConnectionError):
+ result = self.wait(lambda: self.error or predicate(), timeout)
+ self.check_error(exc)
+ return result
+
@synchronized
def session(self, name=None, transactional=False):
"""
@@ -208,7 +214,7 @@ class Connection(Lockable):
"""
self._connected = True
self.wakeup()
- self.catchup(ConnectError)
+ self.ewait(lambda: self._driver._connected, exc=ConnectError)
@synchronized
def disconnect(self):
@@ -217,7 +223,7 @@ class Connection(Lockable):
"""
self._connected = False
self.wakeup()
- self.catchup()
+ self.ewait(lambda: not self._driver._connected)
@synchronized
def connected(self):
@@ -345,6 +351,9 @@ class Session(Lockable):
def check_error(self, exc=SessionError):
self.connection.check_error(exc)
+ def ewait(self, predicate, timeout=None, exc=SessionError):
+ return self.connection.ewait(predicate, timeout, exc)
+
@synchronized
def sender(self, target):
"""
@@ -445,8 +454,7 @@ class Session(Lockable):
raise NontransactionalSession()
self.committing = True
self.wakeup()
- self.catchup()
- assert not self.committing
+ self.ewait(lambda: not self.committing)
if self.aborted:
raise TransactionAborted()
assert self.committed
@@ -461,8 +469,8 @@ class Session(Lockable):
raise NontransactionalSession()
self.aborting = True
self.wakeup()
- self.catchup()
- assert not self.aborting and self.aborted
+ self.ewait(lambda: not self.aborting)
+ assert self.aborted
@synchronized
def start(self):
@@ -560,6 +568,9 @@ class Sender(Lockable):
def check_error(self, exc=SendError):
self.session.check_error(exc)
+ def ewait(self, predicate, timeout=None, exc=SendError):
+ return self.session.ewait(predicate, timeout, exc)
+
@synchronized
def send(self, object):
"""
@@ -585,8 +596,7 @@ class Sender(Lockable):
self.session.outgoing.append(message)
self.wakeup()
- self.catchup()
- assert message not in self.session.outgoing
+ self.ewait(lambda: message not in self.session.outgoing)
@synchronized
def close(self):
@@ -626,11 +636,12 @@ class Receiver(Lockable):
self.started = started
self.capacity = UNLIMITED
self.granted = Serial(0)
- self.drain = False
+ self.draining = False
self.impending = Serial(0)
self.received = Serial(0)
self.returned = Serial(0)
+ self.closing = False
self.closed = False
self.listener = None
self._lock = self.session._lock
@@ -645,6 +656,9 @@ class Receiver(Lockable):
def check_error(self, exc=ReceiveError):
self.session.check_error(exc)
+ def ewait(self, predicate, timeout=None, exc=ReceiveError):
+ return self.session.ewait(predicate, timeout, exc)
+
@synchronized
def pending(self):
return self.received - self.returned
@@ -683,15 +697,13 @@ class Receiver(Lockable):
if self._capacity() == 0:
self.granted = self.returned + 1
self.wakeup()
- self.catchup()
- assert self.impending == self.granted
+ self.ewait(lambda: self.impending == self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self.drain = True
+ self.draining = True
self.wakeup()
- self.catchup()
+ self.ewait(lambda: not self.draining)
assert self.granted == self.received
- self.drain = False
if self.capacity is not UNLIMITED:
self.granted += self._capacity()
self.wakeup()
@@ -723,18 +735,17 @@ class Receiver(Lockable):
self.granted = self.received
self.started = False
self.wakeup()
- self.catchup()
- assert self.granted == self.received
+ self.ewait(lambda: self.impending == self.received)
@synchronized
def close(self):
"""
Close the receiver.
"""
- self.closed = True
+ self.closing = True
self.wakeup()
try:
- self.catchup()
+ self.ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)
@@ -837,6 +848,7 @@ class Driver(Lockable):
self._wakeup_cond = Condition()
self._socket = None
self._conn = None
+ self._connected = False
self._attachments = {}
self._modcount = self.connection._modcount
self.thread = Thread(target=self.run)
@@ -909,6 +921,7 @@ class Driver(Lockable):
self._conn = connection.Connection(self._socket)
try:
self._conn.start(timeout=10)
+ self._connected = True
except connection.VersionError, e:
raise ConnectError(e)
except Timeout:
@@ -921,13 +934,13 @@ class Driver(Lockable):
def reset(self):
self._conn = None
+ self._connected = False
self._attachments.clear()
for ssn in self.connection.sessions.values():
for m in ssn.acked + ssn.unacked + ssn.incoming:
m._transfer_id = None
for rcv in ssn.receivers:
rcv.impending = rcv.received
- rcv.returned = rcv.received
def connected(self):
return self._conn is not None
@@ -1006,11 +1019,12 @@ class Driver(Lockable):
# XXX: need to kill syncs
_ssn.sync()
- if rcv.closed:
+ if rcv.closing:
_ssn.message_cancel(rcv.destination, sync=True)
# XXX: need to kill syncs
_ssn.sync()
del self._attachments[rcv]
+ rcv.closed = True
def process(self, ssn):
if ssn.closing: return
@@ -1116,12 +1130,13 @@ class Driver(Lockable):
self.grant(rcv)
- if rcv.drain:
+ if rcv.draining:
_ssn.message_flush(rcv.destination, sync=True)
# XXX: really need to make this async so that we don't give up the lock
_ssn.sync()
rcv.granted = rcv.received
rcv.impending = rcv.received
+ rcv.draining = False
def send(self, snd, msg):
_ssn = self._attachments[snd.session]
@@ -1201,4 +1216,3 @@ class Driver(Lockable):
__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
"Empty", "timestamp", "uuid4"]
-\
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 071d4dc817..2f1bd81bd4 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -244,13 +244,16 @@ class Sender:
self._completed = RangedSet()
def send(self, cmd):
+ ch = self.session.channel
+ if ch is None:
+ raise SessionDetached()
cmd.id = self.next_id
self.next_id += 1
if self.session.send_id:
self.session.send_id = False
- self.session.channel.session_command_point(cmd.id, 0)
+ ch.session_command_point(cmd.id, 0)
self.commands.append(cmd)
- self.session.channel.connection.write_op(cmd)
+ ch.connection.write_op(cmd)
def completed(self, commands):
idx = 0