diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
commit | 88086e0099c0fb67ac3a01c5f8793c0634b946a0 (patch) | |
tree | 7fab04466df2bb9e33e9e83ccc3286a420f0ee0d | |
parent | 195193dab20a2e7481e470ddc8226cff9102e1fb (diff) | |
download | qpid-python-88086e0099c0fb67ac3a01c5f8793c0634b946a0.tar.gz |
added support for reject/release
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921638 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/messaging/constants.py | 12 | ||||
-rw-r--r-- | python/qpid/messaging/driver.py | 70 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 3 | ||||
-rw-r--r-- | python/qpid/messaging/message.py | 15 | ||||
-rw-r--r-- | python/qpid/tests/messaging/__init__.py | 21 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 71 |
6 files changed, 148 insertions, 44 deletions
diff --git a/python/qpid/messaging/constants.py b/python/qpid/messaging/constants.py index cad47bd52a..f230c4def8 100644 --- a/python/qpid/messaging/constants.py +++ b/python/qpid/messaging/constants.py @@ -17,11 +17,16 @@ # under the License. # +__SELF__ = object() + class Constant: - def __init__(self, name, value=None): + def __init__(self, name, value=__SELF__): self.name = name - self.value = value + if value is __SELF__: + self.value = self + else: + self.value = value def __repr__(self): return self.name @@ -30,3 +35,6 @@ AMQP_PORT = 5672 AMQPS_PORT = 5671 UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + +REJECTED = Constant("REJECTED") +RELEASED = Constant("RELEASED") diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index d0f5b746f3..383845f214 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -18,7 +18,7 @@ # import socket, struct, sys, time -from logging import getLogger +from logging import getLogger, DEBUG from qpid import compat from qpid import sasl from qpid.concurrency import synchronized @@ -27,9 +27,9 @@ from qpid.exceptions import Timeout, VersionError from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder from qpid.messaging import address -from qpid.messaging.constants import UNLIMITED +from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED from qpid.messaging.exceptions import ConnectError -from qpid.messaging.message import get_codec, Message +from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector from qpid.util import connect @@ -435,6 +435,8 @@ class Driver: self._host = (self._host + 1) % len(self._hosts) self.close_engine(e) +DEFAULT_DISPOSITION = Disposition(None) + class Engine: def __init__(self, connection): @@ -915,19 +917,49 @@ class Engine: if ssn.acked: messages = [m for m in ssn.acked if m not in sst.acked] if messages: - # XXX: we're ignoring acks that get lost when disconnected, - # could we deal this via some message-id based purge? - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + ids = RangedSet() + + disposed = [(DEFAULT_DISPOSITION, [])] + for m in messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + if m._transfer_id is None: + continue + ids.add(m._transfer_id) + disp = m._disposition or DEFAULT_DISPOSITION + last, msgs = disposed[-1] + if disp.type is last.type and disp.options == last.options: + msgs.append(m) + else: + disposed.append((disp, [m])) + for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - def ack_ack(): - for m in messages: - ssn.acked.remove(m) - if not ssn.transactional: - sst.acked.remove(m) - sst.write_cmd(MessageAccept(ids), ack_ack) - log.debug("SACK[%s]: %s", ssn.log_id, m) + + def ack_acker(msgs): + def ack_ack(): + for m in msgs: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + return ack_ack + + for disp, msgs in disposed: + if not msgs: continue + if disp.type is None: + op = MessageAccept + elif disp.type is RELEASED: + op = MessageRelease + elif disp.type is REJECTED: + op = MessageReject + sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), + **disp.options), + ack_acker(msgs)) + if log.isEnabledFor(DEBUG): + for m in msgs: + log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) + sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -948,7 +980,7 @@ class Engine: for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(MessageRelease(ids, True)) sst.write_cmd(TxRollback(), do_rb_ok) def do_rb_ok(): @@ -1055,8 +1087,11 @@ class Engine: if mp.application_headers is None: mp.application_headers = {} mp.application_headers[TO] = msg.to - if msg.durable: - dp.delivery_mode = delivery_mode.persistent + if msg.durable is not None: + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + else: + dp.delivery_mode = delivery_mode.non_persistent if msg.priority is not None: dp.priority = msg.priority if msg.ttl is not None: @@ -1109,7 +1144,8 @@ class Engine: if mp.reply_to is not None: msg.reply_to = reply_to2addr(mp.reply_to) msg.correlation_id = mp.correlation_id - msg.durable = dp.delivery_mode == delivery_mode.persistent + if dp.delivery_mode is not None: + msg.durable = dp.delivery_mode == delivery_mode.persistent msg.priority = dp.priority msg.ttl = dp.ttl msg.redelivered = dp.redelivered diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 004cee5f88..53df51dfd8 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -508,7 +508,7 @@ class Session: raise Empty @synchronized - def acknowledge(self, message=None, sync=True): + def acknowledge(self, message=None, disposition=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @@ -530,6 +530,7 @@ class Session: raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) self._wakeup() self._ewait(lambda: len(self.acked) < self.ack_capacity) + m._disposition = disposition self.unacked.remove(m) self.acked.append(m) diff --git a/python/qpid/messaging/message.py b/python/qpid/messaging/message.py index 46494e428e..a9660b05b1 100644 --- a/python/qpid/messaging/message.py +++ b/python/qpid/messaging/message.py @@ -129,7 +129,7 @@ class Message: "correlation_id", "priority", "ttl"]: value = self.__dict__[name] if value is not None: args.append("%s=%r" % (name, value)) - for name in ["durable", "properties"]: + for name in ["durable", "redelivered", "properties"]: value = self.__dict__[name] if value: args.append("%s=%r" % (name, value)) if self.content_type != get_type(self.content): @@ -141,4 +141,15 @@ class Message: args.append(repr(self.content)) return "Message(%s)" % ", ".join(args) -__all__ = ["Message"] +class Disposition: + + def __init__(self, type, **options): + self.type = type + self.options = options + + def __repr__(self): + args = [str(self.type)] + \ + ["%s=%r" % (k, v) for k, v in self.options.items()] + return "Disposition(%s)" % ", ".join(args) + +__all__ = ["Message", "Disposition"] diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py index e47f8cb119..0744932944 100644 --- a/python/qpid/tests/messaging/__init__.py +++ b/python/qpid/tests/messaging/__init__.py @@ -59,6 +59,9 @@ class Base(Test): else: return "%s[%s, %s]" % (base, count, self.test_id) + def message(self, base, count = None): + return Message(self.content(base, count)) + def ping(self, ssn): PING_Q = 'ping-queue; {create: always, delete: always}' # send a message @@ -70,7 +73,7 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None, timeout=0, expected=None): + def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False): messages = [] try: while limit is None or len(messages) < limit: @@ -78,21 +81,22 @@ class Base(Test): except Empty: pass if expected is not None: - self.assertEchos(expected, messages) + self.assertEchos(expected, messages, redelivered) return messages def diff(self, m1, m2): result = {} for attr in ("id", "subject", "user_id", "to", "reply_to", "correlation_id", "durable", "priority", "ttl", - "properties", "content_type", "content"): + "redelivered", "properties", "content_type", + "content"): a1 = getattr(m1, attr) a2 = getattr(m2, attr) if a1 != a2: result[attr] = (a1, a2) return result - def assertEcho(self, msg, echo): + def assertEcho(self, msg, echo, redelivered=False): if not isinstance(msg, Message) or not isinstance(echo, Message): if isinstance(msg, Message): msg = msg.content @@ -102,14 +106,19 @@ class Base(Test): else: delta = self.diff(msg, echo) mttl, ettl = delta.pop("ttl", (0, 0)) + if redelivered: + assert echo.redelivered, \ + "expected %s to be redelivered: %s" % (msg, echo) + if delta.has_key("redelivered"): + del delta["redelivered"] assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl) assert mttl >= ettl, "%s, %s" % (mttl, ettl) assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta) - def assertEchos(self, msgs, echoes): + def assertEchos(self, msgs, echoes, redelivered=False): assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes) for m, e in zip(msgs, echoes): - self.assertEcho(m, e) + self.assertEcho(m, e, redelivered) def assertEmpty(self, rcv): contents = self.drain(rcv) diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index 049413206b..3f2c823914 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -227,21 +227,60 @@ class SessionTests(Base): def testAcknowledgeAsyncAckCapUNLIMITED(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - def send(self, ssn, queue, base, count=1): - snd = ssn.sender(queue, durable=self.durable()) - contents = [] + def testRelease(self): + msgs = [self.message("testRelease", i) for i in range(3)] + snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) + self.ssn.acknowledge(echos[2], Disposition(RELEASED)) + self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) + self.drain(rcv, expected=msgs[2:3]) + self.ssn.acknowledge() + + def testReject(self): + msgs = [self.message("testReject", i) for i in range(3)] + snd = self.ssn.sender(""" + test-reject-queue; { + create: always, + delete: always, + node-properties: { + x-properties: { + alternate_exchange: 'amq.topic' + } + } + } +""") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + rej = self.ssn.receiver("amq.topic") + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(REJECTED)) + self.ssn.acknowledge(echos[2], + Disposition(REJECTED, code=3, text="test-reject")) + self.drain(rej, expected=msgs[1:]) + self.ssn.acknowledge() + + def send(self, ssn, target, base, count=1): + snd = ssn.sender(target, durable=self.durable()) + messages = [] for i in range(count): - c = self.content(base, i) + c = self.message(base, i) snd.send(c) - contents.append(c) + messages.append(c) snd.close() - return contents + return messages def txTest(self, commit): TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, TX_Q, "txTest", 3) + messages = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) rcv = self.ssn.receiver(txrcv.source) @@ -255,10 +294,10 @@ class SessionTests(Base): if commit: txssn.commit() self.assertEmpty(rcv) - self.drain(copy_rcv, expected=contents) + self.drain(copy_rcv, expected=messages) else: txssn.rollback() - self.drain(rcv, expected=contents) + self.drain(rcv, expected=messages, redelivered=True) self.assertEmpty(copy_rcv) self.ssn.acknowledge() @@ -271,13 +310,13 @@ class SessionTests(Base): def txTestSend(self, commit): TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' txssn = self.conn.session(transactional=True) - contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: txssn.commit() - self.drain(rcv, expected=contents) + self.drain(rcv, expected=messages) self.ssn.acknowledge() else: txssn.rollback() @@ -297,17 +336,17 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - self.drain(txrcv, expected=contents) + messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) + self.drain(txrcv, expected=messages) if commit: txssn.acknowledge() else: txssn.rollback() - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.rollback() - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.commit() # commit without ack self.assertEmpty(txrcv) @@ -315,7 +354,7 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.commit() rcv = self.ssn.receiver(TX_ACK_QD) |