summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-06-04 15:57:48 +0000
committerRafael H. Schloming <rhs@apache.org>2009-06-04 15:57:48 +0000
commit03e9ba8e51bae6344c73ba263d1c8a0c86dbbdb1 (patch)
tree6c6b19d78b00f961e71ec774003046c23df7f229 /python/qpid/messaging.py
parentcad0c4d7ef0c341afe8eb0bd7f460803f274f263 (diff)
downloadqpid-python-03e9ba8e51bae6344c73ba263d1c8a0c86dbbdb1.tar.gz
Added commit and rollback to the Session API and streamlined some test utilities.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@781786 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py64
1 files changed, 58 insertions, 6 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 84331134a9..05e2f7c51f 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -147,7 +147,7 @@ class Connection(Lockable):
self._condition = Condition(self._lock)
@synchronized
- def session(self, name=None):
+ def session(self, name=None, transactional=False):
"""
Creates or retrieves the named session. If the name is omitted or
None, then a unique name is chosen based on a randomly generated
@@ -168,7 +168,7 @@ class Connection(Lockable):
if self.sessions.has_key(name):
return self.sessions[name]
else:
- ssn = Session(self, name, self.started)
+ ssn = Session(self, name, self.started, transactional=transactional)
self.sessions[name] = ssn
if self._conn is not None:
ssn._attach()
@@ -268,10 +268,11 @@ class Session(Lockable):
messages, and manage various Senders and Receivers.
"""
- def __init__(self, connection, name, started):
+ def __init__(self, connection, name, started, transactional):
self.connection = connection
self.name = name
self.started = started
+ self.transactional = transactional
self._ssn = None
self.senders = []
self.receivers = []
@@ -279,6 +280,8 @@ class Session(Lockable):
self.incoming = []
self.closed = False
self.unacked = []
+ if self.transactional:
+ self.acked = []
self._lock = RLock()
self._condition = Condition(self._lock)
self.thread = Thread(target = self.run)
@@ -294,6 +297,8 @@ class Session(Lockable):
self._ssn.invoke_lock = self._lock
self._ssn.lock = self._lock
self._ssn.condition = self._condition
+ if self.transactional:
+ self._ssn.tx_select()
for link in self.senders + self.receivers:
link._link()
@@ -414,17 +419,17 @@ class Session(Lockable):
def acknowledge(self, message=None):
"""
Acknowledge the given L{Message}. If message is None, then all
- unackednowledged messages on the session are acknowledged.
+ unacknowledged messages on the session are acknowledged.
@type message: Message
@param message: the message to acknowledge or None
"""
if message is None:
- messages = self.unacked
+ messages = self.unacked[:]
else:
messages = [message]
- ids = RangedSet(*[m._transfer_id for m in self.unacked])
+ ids = RangedSet(*[m._transfer_id for m in messages])
for range in ids:
self._ssn.receiver._completed.add_range(range)
self._ssn.channel.session_completed(self._ssn.receiver._completed)
@@ -436,6 +441,46 @@ class Session(Lockable):
self.unacked.remove(m)
except ValueError:
pass
+ if self.transactional:
+ self.acked.append(m)
+
+ @synchronized
+ def commit(self):
+ """
+ Commit outstanding transactional work. This consists of all
+ message sends and receives since the prior commit or rollback.
+ """
+ if not self.transactional:
+ raise NontransactionalSession()
+ if self._ssn is None:
+ raise Disconnected()
+ self._ssn.tx_commit(sync=True)
+ del self.acked[:]
+ self._ssn.sync()
+
+ @synchronized
+ def rollback(self):
+ """
+ Rollback outstanding transactional work. This consists of all
+ message sends and receives since the prior commit or rollback.
+ """
+ if not self.transactional:
+ raise NontransactionalSession()
+ if self._ssn is None:
+ raise Disconnected()
+
+ ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming])
+ for range in ids:
+ self._ssn.receiver._completed.add_range(range)
+ self._ssn.channel.session_completed(self._ssn.receiver._completed)
+ self._ssn.message_release(ids)
+ self._ssn.tx_rollback(sync=True)
+
+ del self.incoming[:]
+ del self.unacked[:]
+ del self.acked[:]
+
+ self._ssn.sync()
@synchronized
def start(self):
@@ -515,6 +560,13 @@ class Disconnected(Exception):
"""
pass
+class NontransactionalSession(Exception):
+ """
+ Exception raised when commit or rollback is attempted on a non
+ transactional session.
+ """
+ pass
+
class Sender(Lockable):
"""