diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-06-24 17:34:34 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-06-24 17:34:34 +0000 |
commit | 8ebeb0ed934d9baae9ea72f9ad083605ecda315a (patch) | |
tree | 653c6786ab2935015e3bdb89233765996fc55d66 /python/qpid/messaging/driver.py | |
parent | c54e5f2960f37bf059ee9b1b0560b9f9706e67e0 (diff) | |
download | qpid-python-8ebeb0ed934d9baae9ea72f9ad083605ecda315a.tar.gz |
added full support for unreliable, at-least-once, and at-most-once reliability options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@957644 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r-- | python/qpid/messaging/driver.py | 59 |
1 files changed, 41 insertions, 18 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 7f0490332b..76ccd54e9f 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -95,6 +95,7 @@ CLIENT_PROPERTIES = {"product": "qpid python client", "qpid.client_ppid": ppid} def noop(): pass +def sync_noop(): pass class SessionState: @@ -136,7 +137,7 @@ class SessionState: if overrides: self.apply_overrides(cmd, overrides) - if sync or action != noop: + if action != noop: cmd.sync = sync if self.detached: raise Exception("detached") @@ -215,11 +216,14 @@ class LinkIn: def do_link(self, sst, rcv, _rcv, type, subtype, action): link_opts = _rcv.options.get("link", {}) - # XXX: default? - reliability = link_opts.get("reliability", "unreliable") + reliability = link_opts.get("reliability", "at-least-once") declare = link_opts.get("x-declare", {}) subscribe = link_opts.get("x-subscribe", {}) acq_mode = acquire_mode.pre_acquired + if reliability in ("unreliable", "at-most-once"): + rcv._accept_mode = accept_mode.none + else: + rcv._accept_mode = accept_mode.explicit if type == "topic": default_name = "%s.%s" % (rcv.session.name, _rcv.destination) @@ -239,9 +243,12 @@ class LinkIn: acq_mode = acquire_mode.not_acquired bindings = get_bindings(link_opts, queue=_rcv._queue) + sst.write_cmds(bindings) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, - acquire_mode = acq_mode), + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, + destination=_rcv.destination, + acquire_mode = acq_mode, + accept_mode = rcv._accept_mode), overrides=subscribe) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) @@ -263,9 +270,12 @@ class LinkOut: def init_link(self, sst, snd, _snd): _snd.closing = False + _snd.pre_ack = False def do_link(self, sst, snd, _snd, type, subtype, action): link_opts = _snd.options.get("link", {}) + reliability = link_opts.get("reliability", "at-least-once") + _snd.pre_ack = reliability in ("unreliable", "at-most-once") if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject @@ -968,32 +978,34 @@ class Engine: for snd in ssn.senders: if snd.synced >= snd.queued and sst.need_sync: - sst.write_cmd(ExecutionSync(), sync=True) + sst.write_cmd(ExecutionSync(), sync_noop) for rcv in ssn.receivers: self.process_receiver(rcv) if ssn.acked: messages = ssn.acked[sst.acked_idx:] - delta = len(messages) if messages: ids = RangedSet() disposed = [(DEFAULT_DISPOSITION, [])] + acked = [] for m in messages: # XXX: we're ignoring acks that get lost when disconnected, # could we deal this via some message-id based purge? if m._transfer_id is None: - ssn.acked.remove(m) - delta -= 1 + acked.append(m) continue ids.add(m._transfer_id) - disp = m._disposition or DEFAULT_DISPOSITION - last, msgs = disposed[-1] - if disp.type is last.type and disp.options == last.options: - msgs.append(m) + if m._receiver._accept_mode is accept_mode.explicit: + disp = m._disposition or DEFAULT_DISPOSITION + last, msgs = disposed[-1] + if disp.type is last.type and disp.options == last.options: + msgs.append(m) + else: + disposed.append((disp, [m])) else: - disposed.append((disp, [m])) + acked.append(m) for range in ids: sst.executed.add_range(range) @@ -1004,6 +1016,7 @@ class Engine: for m in msgs: ssn.acked.remove(m) sst.acked_idx -= 1 + # XXX: should this check accept_mode too? if not ssn.transactional: sst.acked.remove(m) return ack_ack @@ -1023,9 +1036,9 @@ class Engine: for m in msgs: log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) - # XXX: could add messages with _transfer_id of None sst.acked.extend(messages) - sst.acked_idx += delta + sst.acked_idx += len(messages) + ack_acker(acked)() if ssn.committing and not sst.committing: def commit_ok(): @@ -1173,10 +1186,20 @@ class Engine: sst.outgoing_idx -= 1 log.debug("RACK[%s]: %s", sst.session.log_id, msg) assert msg == m - sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body), msg_acked, sync=msg._sync) + + xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), + payload=body) + + if _snd.pre_ack: + sst.write_cmd(xfr) + else: + sst.write_cmd(xfr, msg_acked, sync=msg._sync) + log.debug("SENT[%s]: %s", sst.session.log_id, msg) + if _snd.pre_ack: + msg_acked() + def do_message_transfer(self, xfr): sst = self.get_sst(xfr) ssn = sst.session |