summaryrefslogtreecommitdiff
path: root/python/qpid/messaging/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-24 17:34:34 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-24 17:34:34 +0000
commit8ebeb0ed934d9baae9ea72f9ad083605ecda315a (patch)
tree653c6786ab2935015e3bdb89233765996fc55d66 /python/qpid/messaging/driver.py
parentc54e5f2960f37bf059ee9b1b0560b9f9706e67e0 (diff)
downloadqpid-python-8ebeb0ed934d9baae9ea72f9ad083605ecda315a.tar.gz
added full support for unreliable, at-least-once, and at-most-once reliability options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@957644 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r--python/qpid/messaging/driver.py59
1 files changed, 41 insertions, 18 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 7f0490332b..76ccd54e9f 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -95,6 +95,7 @@ CLIENT_PROPERTIES = {"product": "qpid python client",
"qpid.client_ppid": ppid}
def noop(): pass
+def sync_noop(): pass
class SessionState:
@@ -136,7 +137,7 @@ class SessionState:
if overrides:
self.apply_overrides(cmd, overrides)
- if sync or action != noop:
+ if action != noop:
cmd.sync = sync
if self.detached:
raise Exception("detached")
@@ -215,11 +216,14 @@ class LinkIn:
def do_link(self, sst, rcv, _rcv, type, subtype, action):
link_opts = _rcv.options.get("link", {})
- # XXX: default?
- reliability = link_opts.get("reliability", "unreliable")
+ reliability = link_opts.get("reliability", "at-least-once")
declare = link_opts.get("x-declare", {})
subscribe = link_opts.get("x-subscribe", {})
acq_mode = acquire_mode.pre_acquired
+ if reliability in ("unreliable", "at-most-once"):
+ rcv._accept_mode = accept_mode.none
+ else:
+ rcv._accept_mode = accept_mode.explicit
if type == "topic":
default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
@@ -239,9 +243,12 @@ class LinkIn:
acq_mode = acquire_mode.not_acquired
bindings = get_bindings(link_opts, queue=_rcv._queue)
+
sst.write_cmds(bindings)
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
- acquire_mode = acq_mode),
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue,
+ destination=_rcv.destination,
+ acquire_mode = acq_mode,
+ accept_mode = rcv._accept_mode),
overrides=subscribe)
sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
@@ -263,9 +270,12 @@ class LinkOut:
def init_link(self, sst, snd, _snd):
_snd.closing = False
+ _snd.pre_ack = False
def do_link(self, sst, snd, _snd, type, subtype, action):
link_opts = _snd.options.get("link", {})
+ reliability = link_opts.get("reliability", "at-least-once")
+ _snd.pre_ack = reliability in ("unreliable", "at-most-once")
if type == "topic":
_snd._exchange = _snd.name
_snd._routing_key = _snd.subject
@@ -968,32 +978,34 @@ class Engine:
for snd in ssn.senders:
if snd.synced >= snd.queued and sst.need_sync:
- sst.write_cmd(ExecutionSync(), sync=True)
+ sst.write_cmd(ExecutionSync(), sync_noop)
for rcv in ssn.receivers:
self.process_receiver(rcv)
if ssn.acked:
messages = ssn.acked[sst.acked_idx:]
- delta = len(messages)
if messages:
ids = RangedSet()
disposed = [(DEFAULT_DISPOSITION, [])]
+ acked = []
for m in messages:
# XXX: we're ignoring acks that get lost when disconnected,
# could we deal this via some message-id based purge?
if m._transfer_id is None:
- ssn.acked.remove(m)
- delta -= 1
+ acked.append(m)
continue
ids.add(m._transfer_id)
- disp = m._disposition or DEFAULT_DISPOSITION
- last, msgs = disposed[-1]
- if disp.type is last.type and disp.options == last.options:
- msgs.append(m)
+ if m._receiver._accept_mode is accept_mode.explicit:
+ disp = m._disposition or DEFAULT_DISPOSITION
+ last, msgs = disposed[-1]
+ if disp.type is last.type and disp.options == last.options:
+ msgs.append(m)
+ else:
+ disposed.append((disp, [m]))
else:
- disposed.append((disp, [m]))
+ acked.append(m)
for range in ids:
sst.executed.add_range(range)
@@ -1004,6 +1016,7 @@ class Engine:
for m in msgs:
ssn.acked.remove(m)
sst.acked_idx -= 1
+ # XXX: should this check accept_mode too?
if not ssn.transactional:
sst.acked.remove(m)
return ack_ack
@@ -1023,9 +1036,9 @@ class Engine:
for m in msgs:
log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
- # XXX: could add messages with _transfer_id of None
sst.acked.extend(messages)
- sst.acked_idx += delta
+ sst.acked_idx += len(messages)
+ ack_acker(acked)()
if ssn.committing and not sst.committing:
def commit_ok():
@@ -1173,10 +1186,20 @@ class Engine:
sst.outgoing_idx -= 1
log.debug("RACK[%s]: %s", sst.session.log_id, msg)
assert msg == m
- sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body), msg_acked, sync=msg._sync)
+
+ xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
+ payload=body)
+
+ if _snd.pre_ack:
+ sst.write_cmd(xfr)
+ else:
+ sst.write_cmd(xfr, msg_acked, sync=msg._sync)
+
log.debug("SENT[%s]: %s", sst.session.log_id, msg)
+ if _snd.pre_ack:
+ msg_acked()
+
def do_message_transfer(self, xfr):
sst = self.get_sst(xfr)
ssn = sst.session