diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-06-04 15:57:48 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-06-04 15:57:48 +0000 |
commit | 03e9ba8e51bae6344c73ba263d1c8a0c86dbbdb1 (patch) | |
tree | 6c6b19d78b00f961e71ec774003046c23df7f229 /python/qpid/messaging.py | |
parent | cad0c4d7ef0c341afe8eb0bd7f460803f274f263 (diff) | |
download | qpid-python-03e9ba8e51bae6344c73ba263d1c8a0c86dbbdb1.tar.gz |
Added commit and rollback to the Session API and streamlined some test utilities.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@781786 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 64 |
1 files changed, 58 insertions, 6 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 84331134a9..05e2f7c51f 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -147,7 +147,7 @@ class Connection(Lockable): self._condition = Condition(self._lock) @synchronized - def session(self, name=None): + def session(self, name=None, transactional=False): """ Creates or retrieves the named session. If the name is omitted or None, then a unique name is chosen based on a randomly generated @@ -168,7 +168,7 @@ class Connection(Lockable): if self.sessions.has_key(name): return self.sessions[name] else: - ssn = Session(self, name, self.started) + ssn = Session(self, name, self.started, transactional=transactional) self.sessions[name] = ssn if self._conn is not None: ssn._attach() @@ -268,10 +268,11 @@ class Session(Lockable): messages, and manage various Senders and Receivers. """ - def __init__(self, connection, name, started): + def __init__(self, connection, name, started, transactional): self.connection = connection self.name = name self.started = started + self.transactional = transactional self._ssn = None self.senders = [] self.receivers = [] @@ -279,6 +280,8 @@ class Session(Lockable): self.incoming = [] self.closed = False self.unacked = [] + if self.transactional: + self.acked = [] self._lock = RLock() self._condition = Condition(self._lock) self.thread = Thread(target = self.run) @@ -294,6 +297,8 @@ class Session(Lockable): self._ssn.invoke_lock = self._lock self._ssn.lock = self._lock self._ssn.condition = self._condition + if self.transactional: + self._ssn.tx_select() for link in self.senders + self.receivers: link._link() @@ -414,17 +419,17 @@ class Session(Lockable): def acknowledge(self, message=None): """ Acknowledge the given L{Message}. If message is None, then all - unackednowledged messages on the session are acknowledged. + unacknowledged messages on the session are acknowledged. @type message: Message @param message: the message to acknowledge or None """ if message is None: - messages = self.unacked + messages = self.unacked[:] else: messages = [message] - ids = RangedSet(*[m._transfer_id for m in self.unacked]) + ids = RangedSet(*[m._transfer_id for m in messages]) for range in ids: self._ssn.receiver._completed.add_range(range) self._ssn.channel.session_completed(self._ssn.receiver._completed) @@ -436,6 +441,46 @@ class Session(Lockable): self.unacked.remove(m) except ValueError: pass + if self.transactional: + self.acked.append(m) + + @synchronized + def commit(self): + """ + Commit outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + if self._ssn is None: + raise Disconnected() + self._ssn.tx_commit(sync=True) + del self.acked[:] + self._ssn.sync() + + @synchronized + def rollback(self): + """ + Rollback outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + if self._ssn is None: + raise Disconnected() + + ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming]) + for range in ids: + self._ssn.receiver._completed.add_range(range) + self._ssn.channel.session_completed(self._ssn.receiver._completed) + self._ssn.message_release(ids) + self._ssn.tx_rollback(sync=True) + + del self.incoming[:] + del self.unacked[:] + del self.acked[:] + + self._ssn.sync() @synchronized def start(self): @@ -515,6 +560,13 @@ class Disconnected(Exception): """ pass +class NontransactionalSession(Exception): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass + class Sender(Lockable): """ |