diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-11-14 16:08:05 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-11-14 16:08:05 +0000 |
commit | 454379917ad7b797a045cbefc56bf598e3fd534b (patch) | |
tree | 9b339c50c2a9a4abd7d993874f0699dcfb043d0f /python | |
parent | caac82682127f212d2d714154d09fc51244cd4ae (diff) | |
download | qpid-python-454379917ad7b797a045cbefc56bf598e3fd534b.tar.gz |
added support for sender/receiver delete, made tests clean up after themselves, split logging of raw bytes and unencoded ops into distinct categories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@836200 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/driver.py | 71 | ||||
-rw-r--r-- | python/qpid/messaging.py | 13 | ||||
-rw-r--r-- | python/qpid/tests/messaging.py | 136 |
3 files changed, 151 insertions, 69 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 3ea6655aca..3e45045bec 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -30,6 +30,8 @@ from threading import Condition, Thread from util import connect log = getLogger("qpid.messaging") +rawlog = getLogger("qpid.messaging.io.raw") +opslog = getLogger("qpid.messaging.io.ops") def addr2reply_to(addr): name, subject, options = address.parse(addr) @@ -101,10 +103,11 @@ class SessionState: def write_query(self, query, handler): id = self.sent - query.sync = True self.write_cmd(query, lambda: handler(self.results.pop(id))) def write_cmd(self, cmd, completion=noop): + if completion != noop: + cmd.sync = True if self.detached: raise Exception("detached") cmd.id = self.sent @@ -195,9 +198,9 @@ class Driver: try: data = self._socket.recv(64*1024) if data: - log.debug("READ: %r", data) + rawlog.debug("READ: %r", data) else: - log.debug("ABORTED: %s", self._socket.getpeername()) + rawlog.debug("ABORTED: %s", self._socket.getpeername()) error = "connection aborted" recoverable = True except socket.error, e: @@ -219,7 +222,7 @@ class Driver: self._op_dec.write(*self._seg_dec.read()) for op in self._op_dec.read(): self.assign_id(op) - log.debug("RCVD: %r", op) + opslog.debug("RCVD: %r", op) op.dispatch(self) except VersionError, e: error = e @@ -244,7 +247,7 @@ class Driver: def writeable(self): try: n = self._socket.send(self._buf) - log.debug("SENT: %r", self._buf[:n]) + rawlog.debug("SENT: %r", self._buf[:n]) self._buf = self._buf[n:] except socket.error, e: self._error(e, True) @@ -268,7 +271,7 @@ class Driver: self.connection.error = (err,) def write_op(self, op): - log.debug("SENT: %r", op) + opslog.debug("SENT: %r", op) self._op_enc.write(op) self._seg_enc.write(*self._op_enc.read()) self._frame_enc.write(*self._seg_enc.read()) @@ -446,6 +449,7 @@ class Driver: _snd = self._attachments.get(snd) if _snd is None and not snd.closing and not snd.closed: _snd = Attachment(snd) + _snd.closing = False if snd.target is None: snd.error = ("target is None",) @@ -488,7 +492,7 @@ class Driver: return if result.not_found: - if _snd.options.get("create") in ("always", "receiver"): + if _snd.options.get("create") in ("always", "sender"): sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT)) _snd._exchange = "" _snd._routing_key = _snd.name @@ -503,9 +507,15 @@ class Driver: sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q) self._attachments[snd] = _snd - if snd.closing and not snd.closed: - del self._attachments[snd] - snd.closed = True + if snd.closing and not (snd.closed or _snd.closing): + _snd.closing = True + def do_unlink(): + del self._attachments[snd] + snd.closed = True + if _snd.options.get("delete") in ("always", "sender"): + self.delete(sst, _snd.name, do_unlink) + else: + do_unlink() def link_in(self, rcv): sst = self._attachments.get(rcv.session) @@ -584,16 +594,37 @@ class Driver: if rcv.closing and not rcv.closed: if rcv.linked: if not _rcv.canceled: - def close_rcv(): + def do_unlink(): del self._attachments[rcv] rcv.closed = True - sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) + if _rcv.options.get("delete") in ("always", "receiver"): + sst.write_cmd(MessageCancel(rcv.destination)) + self.delete(sst, _rcv.name, do_unlink) + else: + sst.write_cmd(MessageCancel(rcv.destination), do_unlink) _rcv.canceled = True else: rcv.closed = True + def delete(self, sst, name, completion): + def do_queue_delq(result): + if sst.detached: + return + if result.queue: + sst.write_cmd(QueueDelete(name), completion) + else: + completion() + def do_exchange_delq(result): + if sst.detached: + return + if result.not_found: + sst.write_query(QueueQuery(name), do_queue_delq) + else: + sst.write_cmd(ExchangeDelete(name), completion) + sst.write_query(ExchangeQuery(name), do_exchange_delq) + def process(self, ssn): - if ssn.closing: return + if ssn.closed or ssn.closing: return sst = self._attachments[ssn] @@ -625,7 +656,7 @@ class Driver: ssn.acked.remove(m) if not ssn.transactional: sst.acked.remove(m) - sst.write_cmd(MessageAccept(ids, sync=True), ack_ack) + sst.write_cmd(MessageAccept(ids), ack_ack) sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -635,7 +666,7 @@ class Driver: ssn.committed = True ssn.aborting = False ssn.aborted = False - sst.write_cmd(TxCommit(sync=True), commit_ok) + sst.write_cmd(TxCommit(), commit_ok) sst.committing = True if ssn.aborting and not sst.aborting: @@ -647,7 +678,7 @@ class Driver: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) sst.write_cmd(MessageRelease(ids)) - sst.write_cmd(TxRollback(sync=True), do_rb_ok) + sst.write_cmd(TxRollback(), do_rb_ok) def do_rb_ok(): del ssn.incoming[:] @@ -670,7 +701,7 @@ class Driver: for rcv in ssn.receivers: sst.write_cmd(MessageStop(rcv.destination)) - sst.write_cmd(ExecutionSync(sync=True), do_rb) + sst.write_cmd(ExecutionSync(), do_rb) def grant(self, rcv): sst = self._attachments[rcv.session] @@ -702,7 +733,7 @@ class Driver: rcv.impending = rcv.received _rcv.draining = False self.grant(rcv) - sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop) + sst.write_cmd(MessageStop(rcv.destination), do_stop) if rcv.draining: _rcv.draining = True @@ -711,7 +742,7 @@ class Driver: rcv.granted = rcv.impending _rcv.draining = False rcv.draining = False - sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush) + sst.write_cmd(MessageFlush(rcv.destination), do_flush) def process_receiver(self, rcv): @@ -758,7 +789,7 @@ class Driver: sst.outgoing_idx -= 1 assert msg == m sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body, sync=True), msg_acked) + payload=body), msg_acked) def do_message_transfer(self, xfr): sst = self.get_sst(xfr) diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index ec1c054e14..703a958425 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -436,14 +436,15 @@ class Session: """ Close the session. """ + # XXX: should be able to express this condition through API calls + self._ewait(lambda: not self.outgoing and not self.acked) + for link in self.receivers + self.senders: link.close() self.closing = True self._wakeup() self._ewait(lambda: self.closed) - # XXX: should be able to express this condition through API calls - self._ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) class SendError(SessionError): @@ -557,10 +558,12 @@ class Sender: """ Close the Sender. """ - # XXX: should make driver do something here - if not self.closed: + self.closing = True + self._wakeup() + try: + self.session._ewait(lambda: self.closed) + finally: self.session.senders.remove(self) - self.closed = True class ReceiveError(SessionError): pass diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 8cadb2a8fe..474c9ab630 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -25,7 +25,8 @@ from qpid import compat from qpid.tests import Test from qpid.harness import Skipped from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ - InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4 + InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \ + UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -66,7 +67,7 @@ class Base(Test): return "%s[%s, %s]" % (base, count, self.test_id) def ping(self, ssn): - PING_Q = 'ping-queue; {create: always}' + PING_Q = 'ping-queue; {create: always, delete: always}' # send a message sender = ssn.sender(PING_Q, durable=self.durable()) content = self.content("ping") @@ -173,7 +174,8 @@ class ConnectionTests(Base): self.conn.close() assert not self.conn.connected() -ACK_Q = 'test-ack-queue; {create: always}' +ACK_QC = 'test-ack-queue; {create: always}' +ACK_QD = 'test-ack-queue; {delete: always}' class SessionTests(Base): @@ -185,7 +187,7 @@ class SessionTests(Base): return self.conn.session() def testSender(self): - snd = self.ssn.sender('test-snd-queue; {create: always}', + snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', durable=self.durable()) snd2 = self.ssn.sender(snd.target, durable=self.durable()) assert snd is not snd2 @@ -199,7 +201,7 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testReceiver(self): - rcv = self.ssn.receiver('test-rcv-queue; {create: always}') + rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}') rcv2 = self.ssn.receiver(rcv.source) assert rcv is not rcv2 rcv2.close() @@ -212,7 +214,7 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testNextReceiver(self): - ADDR = 'test-next-rcv-queue; {create: always}' + ADDR = 'test-next-rcv-queue; {create: always, delete: always}' rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) @@ -241,14 +243,14 @@ class SessionTests(Base): # empty on setup, and possibly also to drain queues on teardown def ackTest(self, acker, ack_capacity=None): # send a bunch of messages - snd = self.ssn.sender(ACK_Q, durable=self.durable()) + snd = self.ssn.sender(ACK_QC, durable=self.durable()) contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking - rcv = self.ssn.receiver(ACK_Q) + rcv = self.ssn.receiver(ACK_QC) self.drain(rcv, expected=contents) self.ssn.close() @@ -257,7 +259,7 @@ class SessionTests(Base): self.ssn = self.conn.session() if ack_capacity is not None: self.ssn.ack_capacity = ack_capacity - rcv = self.ssn.receiver(ACK_Q) + rcv = self.ssn.receiver(ACK_QC) self.drain(rcv, expected=contents) acker(self.ssn) self.ssn.close() @@ -265,7 +267,7 @@ class SessionTests(Base): # drain the queue a final time and verify that the messages were # dequeued self.ssn = self.conn.session() - rcv = self.ssn.receiver(ACK_Q) + rcv = self.ssn.receiver(ACK_QD) self.assertEmpty(rcv) def testAcknowledge(self): @@ -283,7 +285,7 @@ class SessionTests(Base): pass finally: self.ssn.ack_capacity = UNLIMITED - self.drain(self.ssn.receiver(ACK_Q)) + self.drain(self.ssn.receiver(ACK_QD)) self.ssn.acknowledge() def testAcknowledgeAsyncAckCap1(self): @@ -306,8 +308,8 @@ class SessionTests(Base): return contents def txTest(self, commit): - TX_Q = 'test-tx-queue; {create: always}' - TX_Q_COPY = 'test-tx-queue-copy; {create: always}' + TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' + TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' txssn = self.conn.session(transactional=True) contents = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) @@ -337,7 +339,7 @@ class SessionTests(Base): self.txTest(False) def txTestSend(self, commit): - TX_SEND_Q = 'test-tx-send-queue; {create: always}' + TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' txssn = self.conn.session(transactional=True) contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) @@ -360,11 +362,12 @@ class SessionTests(Base): self.txTestSend(False) def txTestAck(self, commit): - TX_ACK_Q = 'test-tx-ack-queue; {create: always}' + TX_ACK_QC = 'test-tx-ack-queue; {create: always}' + TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_Q) + txrcv = txssn.receiver(TX_ACK_QC) self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3) + contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) assert contents == self.drain(txrcv) if commit: @@ -382,11 +385,11 @@ class SessionTests(Base): txssn.close() txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_Q) + txrcv = txssn.receiver(TX_ACK_QC) assert contents == self.drain(txrcv) txssn.acknowledge() txssn.commit() - rcv = self.ssn.receiver(TX_ACK_Q) + rcv = self.ssn.receiver(TX_ACK_QD) self.assertEmpty(rcv) txssn.close() self.assertEmpty(rcv) @@ -405,7 +408,7 @@ class SessionTests(Base): except Disconnected: pass -RECEIVER_Q = 'test-receiver-queue; {create: always}' +RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' class ReceiverTests(Base): @@ -482,28 +485,6 @@ class ReceiverTests(Base): self.ssn.acknowledge() - def testPending(self): - self.rcv.capacity = UNLIMITED - assert self.rcv.pending() == 0 - - for i in range(3): - self.send("testPending", i) - self.sleep() - assert self.rcv.pending() == 3 - - for i in range(3, 10): - self.send("testPending", i) - self.sleep() - assert self.rcv.pending() == 10 - - self.drain(self.rcv, limit=3) - assert self.rcv.pending() == 7 - - self.drain(self.rcv) - assert self.rcv.pending() == 0 - - self.ssn.acknowledge() - def testCapacity(self): self.rcv.capacity = 5 self.assertPending(self.rcv, 0) @@ -537,8 +518,75 @@ class ReceiverTests(Base): self.ssn.acknowledge() + def testPending(self): + self.rcv.capacity = UNLIMITED + assert self.rcv.pending() == 0 + + for i in range(3): + self.send("testPending", i) + self.sleep() + assert self.rcv.pending() == 3 + + for i in range(3, 10): + self.send("testPending", i) + self.sleep() + assert self.rcv.pending() == 10 + + self.drain(self.rcv, limit=3) + assert self.rcv.pending() == 7 + + self.drain(self.rcv) + assert self.rcv.pending() == 0 + + self.ssn.acknowledge() + # XXX: need testClose +class AddressTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def testDeleteBySender(self): + snd = self.ssn.sender("test-delete; {create: always}") + snd.send("ping") + snd.close() + snd = self.ssn.sender("test-delete; {delete: always}") + snd.send("ping") + snd.close() + snd = self.ssn.sender("test-delete") + try: + snd.send("ping") + except SendError, e: + assert "no such queue" in str(e) + + def testDeleteByReceiver(self): + rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") + try: + rcv.fetch(0) + except Empty: + pass + rcv.close() + + try: + self.ssn.receiver("test-delete") + except SendError, e: + assert "no such queue" in str(e) + + def testDeleteSpecial(self): + snd = self.ssn.sender("amq.topic; {delete: always}") + snd.send("asdf") + try: + snd.close() + except SessionError, e: + assert "Cannot delete default exchange" in str(e) + # XXX: need to figure out close after error + self.conn._remove_session(self.ssn) + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" @@ -606,7 +654,7 @@ class AddressErrorTests(Base): self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError, lambda e: "unrecognized characters" in str(e)) -SENDER_Q = 'test-sender-q; {create: always}' +SENDER_Q = 'test-sender-q; {create: always, delete: always}' class SenderTests(Base): @@ -715,7 +763,7 @@ class MessageTests(Base): m.content = u"<html/>" assert m.content_type == "text/html; charset=utf8" -ECHO_Q = 'test-message-echo-queue; {create: always}' +ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' class MessageEchoTests(Base): |