summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-11-14 16:08:05 +0000
committerRafael H. Schloming <rhs@apache.org>2009-11-14 16:08:05 +0000
commit454379917ad7b797a045cbefc56bf598e3fd534b (patch)
tree9b339c50c2a9a4abd7d993874f0699dcfb043d0f /python
parentcaac82682127f212d2d714154d09fc51244cd4ae (diff)
downloadqpid-python-454379917ad7b797a045cbefc56bf598e3fd534b.tar.gz
added support for sender/receiver delete, made tests clean up after themselves, split logging of raw bytes and unencoded ops into distinct categories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@836200 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/driver.py71
-rw-r--r--python/qpid/messaging.py13
-rw-r--r--python/qpid/tests/messaging.py136
3 files changed, 151 insertions, 69 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 3ea6655aca..3e45045bec 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -30,6 +30,8 @@ from threading import Condition, Thread
from util import connect
log = getLogger("qpid.messaging")
+rawlog = getLogger("qpid.messaging.io.raw")
+opslog = getLogger("qpid.messaging.io.ops")
def addr2reply_to(addr):
name, subject, options = address.parse(addr)
@@ -101,10 +103,11 @@ class SessionState:
def write_query(self, query, handler):
id = self.sent
- query.sync = True
self.write_cmd(query, lambda: handler(self.results.pop(id)))
def write_cmd(self, cmd, completion=noop):
+ if completion != noop:
+ cmd.sync = True
if self.detached:
raise Exception("detached")
cmd.id = self.sent
@@ -195,9 +198,9 @@ class Driver:
try:
data = self._socket.recv(64*1024)
if data:
- log.debug("READ: %r", data)
+ rawlog.debug("READ: %r", data)
else:
- log.debug("ABORTED: %s", self._socket.getpeername())
+ rawlog.debug("ABORTED: %s", self._socket.getpeername())
error = "connection aborted"
recoverable = True
except socket.error, e:
@@ -219,7 +222,7 @@ class Driver:
self._op_dec.write(*self._seg_dec.read())
for op in self._op_dec.read():
self.assign_id(op)
- log.debug("RCVD: %r", op)
+ opslog.debug("RCVD: %r", op)
op.dispatch(self)
except VersionError, e:
error = e
@@ -244,7 +247,7 @@ class Driver:
def writeable(self):
try:
n = self._socket.send(self._buf)
- log.debug("SENT: %r", self._buf[:n])
+ rawlog.debug("SENT: %r", self._buf[:n])
self._buf = self._buf[n:]
except socket.error, e:
self._error(e, True)
@@ -268,7 +271,7 @@ class Driver:
self.connection.error = (err,)
def write_op(self, op):
- log.debug("SENT: %r", op)
+ opslog.debug("SENT: %r", op)
self._op_enc.write(op)
self._seg_enc.write(*self._op_enc.read())
self._frame_enc.write(*self._seg_enc.read())
@@ -446,6 +449,7 @@ class Driver:
_snd = self._attachments.get(snd)
if _snd is None and not snd.closing and not snd.closed:
_snd = Attachment(snd)
+ _snd.closing = False
if snd.target is None:
snd.error = ("target is None",)
@@ -488,7 +492,7 @@ class Driver:
return
if result.not_found:
- if _snd.options.get("create") in ("always", "receiver"):
+ if _snd.options.get("create") in ("always", "sender"):
sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
_snd._exchange = ""
_snd._routing_key = _snd.name
@@ -503,9 +507,15 @@ class Driver:
sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
self._attachments[snd] = _snd
- if snd.closing and not snd.closed:
- del self._attachments[snd]
- snd.closed = True
+ if snd.closing and not (snd.closed or _snd.closing):
+ _snd.closing = True
+ def do_unlink():
+ del self._attachments[snd]
+ snd.closed = True
+ if _snd.options.get("delete") in ("always", "sender"):
+ self.delete(sst, _snd.name, do_unlink)
+ else:
+ do_unlink()
def link_in(self, rcv):
sst = self._attachments.get(rcv.session)
@@ -584,16 +594,37 @@ class Driver:
if rcv.closing and not rcv.closed:
if rcv.linked:
if not _rcv.canceled:
- def close_rcv():
+ def do_unlink():
del self._attachments[rcv]
rcv.closed = True
- sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+ if _rcv.options.get("delete") in ("always", "receiver"):
+ sst.write_cmd(MessageCancel(rcv.destination))
+ self.delete(sst, _rcv.name, do_unlink)
+ else:
+ sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
_rcv.canceled = True
else:
rcv.closed = True
+ def delete(self, sst, name, completion):
+ def do_queue_delq(result):
+ if sst.detached:
+ return
+ if result.queue:
+ sst.write_cmd(QueueDelete(name), completion)
+ else:
+ completion()
+ def do_exchange_delq(result):
+ if sst.detached:
+ return
+ if result.not_found:
+ sst.write_query(QueueQuery(name), do_queue_delq)
+ else:
+ sst.write_cmd(ExchangeDelete(name), completion)
+ sst.write_query(ExchangeQuery(name), do_exchange_delq)
+
def process(self, ssn):
- if ssn.closing: return
+ if ssn.closed or ssn.closing: return
sst = self._attachments[ssn]
@@ -625,7 +656,7 @@ class Driver:
ssn.acked.remove(m)
if not ssn.transactional:
sst.acked.remove(m)
- sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+ sst.write_cmd(MessageAccept(ids), ack_ack)
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -635,7 +666,7 @@ class Driver:
ssn.committed = True
ssn.aborting = False
ssn.aborted = False
- sst.write_cmd(TxCommit(sync=True), commit_ok)
+ sst.write_cmd(TxCommit(), commit_ok)
sst.committing = True
if ssn.aborting and not sst.aborting:
@@ -647,7 +678,7 @@ class Driver:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
sst.write_cmd(MessageRelease(ids))
- sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+ sst.write_cmd(TxRollback(), do_rb_ok)
def do_rb_ok():
del ssn.incoming[:]
@@ -670,7 +701,7 @@ class Driver:
for rcv in ssn.receivers:
sst.write_cmd(MessageStop(rcv.destination))
- sst.write_cmd(ExecutionSync(sync=True), do_rb)
+ sst.write_cmd(ExecutionSync(), do_rb)
def grant(self, rcv):
sst = self._attachments[rcv.session]
@@ -702,7 +733,7 @@ class Driver:
rcv.impending = rcv.received
_rcv.draining = False
self.grant(rcv)
- sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+ sst.write_cmd(MessageStop(rcv.destination), do_stop)
if rcv.draining:
_rcv.draining = True
@@ -711,7 +742,7 @@ class Driver:
rcv.granted = rcv.impending
_rcv.draining = False
rcv.draining = False
- sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+ sst.write_cmd(MessageFlush(rcv.destination), do_flush)
def process_receiver(self, rcv):
@@ -758,7 +789,7 @@ class Driver:
sst.outgoing_idx -= 1
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body, sync=True), msg_acked)
+ payload=body), msg_acked)
def do_message_transfer(self, xfr):
sst = self.get_sst(xfr)
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index ec1c054e14..703a958425 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -436,14 +436,15 @@ class Session:
"""
Close the session.
"""
+ # XXX: should be able to express this condition through API calls
+ self._ewait(lambda: not self.outgoing and not self.acked)
+
for link in self.receivers + self.senders:
link.close()
self.closing = True
self._wakeup()
self._ewait(lambda: self.closed)
- # XXX: should be able to express this condition through API calls
- self._ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
class SendError(SessionError):
@@ -557,10 +558,12 @@ class Sender:
"""
Close the Sender.
"""
- # XXX: should make driver do something here
- if not self.closed:
+ self.closing = True
+ self._wakeup()
+ try:
+ self.session._ewait(lambda: self.closed)
+ finally:
self.session.senders.remove(self)
- self.closed = True
class ReceiveError(SessionError):
pass
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index 8cadb2a8fe..474c9ab630 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -25,7 +25,8 @@ from qpid import compat
from qpid.tests import Test
from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
+ InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \
+ UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -66,7 +67,7 @@ class Base(Test):
return "%s[%s, %s]" % (base, count, self.test_id)
def ping(self, ssn):
- PING_Q = 'ping-queue; {create: always}'
+ PING_Q = 'ping-queue; {create: always, delete: always}'
# send a message
sender = ssn.sender(PING_Q, durable=self.durable())
content = self.content("ping")
@@ -173,7 +174,8 @@ class ConnectionTests(Base):
self.conn.close()
assert not self.conn.connected()
-ACK_Q = 'test-ack-queue; {create: always}'
+ACK_QC = 'test-ack-queue; {create: always}'
+ACK_QD = 'test-ack-queue; {delete: always}'
class SessionTests(Base):
@@ -185,7 +187,7 @@ class SessionTests(Base):
return self.conn.session()
def testSender(self):
- snd = self.ssn.sender('test-snd-queue; {create: always}',
+ snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
durable=self.durable())
snd2 = self.ssn.sender(snd.target, durable=self.durable())
assert snd is not snd2
@@ -199,7 +201,7 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testReceiver(self):
- rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
+ rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}')
rcv2 = self.ssn.receiver(rcv.source)
assert rcv is not rcv2
rcv2.close()
@@ -212,7 +214,7 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testNextReceiver(self):
- ADDR = 'test-next-rcv-queue; {create: always}'
+ ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
@@ -241,14 +243,14 @@ class SessionTests(Base):
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
- snd = self.ssn.sender(ACK_Q, durable=self.durable())
+ snd = self.ssn.sender(ACK_QC, durable=self.durable())
contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
- rcv = self.ssn.receiver(ACK_Q)
+ rcv = self.ssn.receiver(ACK_QC)
self.drain(rcv, expected=contents)
self.ssn.close()
@@ -257,7 +259,7 @@ class SessionTests(Base):
self.ssn = self.conn.session()
if ack_capacity is not None:
self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver(ACK_Q)
+ rcv = self.ssn.receiver(ACK_QC)
self.drain(rcv, expected=contents)
acker(self.ssn)
self.ssn.close()
@@ -265,7 +267,7 @@ class SessionTests(Base):
# drain the queue a final time and verify that the messages were
# dequeued
self.ssn = self.conn.session()
- rcv = self.ssn.receiver(ACK_Q)
+ rcv = self.ssn.receiver(ACK_QD)
self.assertEmpty(rcv)
def testAcknowledge(self):
@@ -283,7 +285,7 @@ class SessionTests(Base):
pass
finally:
self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver(ACK_Q))
+ self.drain(self.ssn.receiver(ACK_QD))
self.ssn.acknowledge()
def testAcknowledgeAsyncAckCap1(self):
@@ -306,8 +308,8 @@ class SessionTests(Base):
return contents
def txTest(self, commit):
- TX_Q = 'test-tx-queue; {create: always}'
- TX_Q_COPY = 'test-tx-queue-copy; {create: always}'
+ TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
+ TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
txssn = self.conn.session(transactional=True)
contents = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
@@ -337,7 +339,7 @@ class SessionTests(Base):
self.txTest(False)
def txTestSend(self, commit):
- TX_SEND_Q = 'test-tx-send-queue; {create: always}'
+ TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
txssn = self.conn.session(transactional=True)
contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
rcv = self.ssn.receiver(TX_SEND_Q)
@@ -360,11 +362,12 @@ class SessionTests(Base):
self.txTestSend(False)
def txTestAck(self, commit):
- TX_ACK_Q = 'test-tx-ack-queue; {create: always}'
+ TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
+ TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_Q)
+ txrcv = txssn.receiver(TX_ACK_QC)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
+ contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
assert contents == self.drain(txrcv)
if commit:
@@ -382,11 +385,11 @@ class SessionTests(Base):
txssn.close()
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_Q)
+ txrcv = txssn.receiver(TX_ACK_QC)
assert contents == self.drain(txrcv)
txssn.acknowledge()
txssn.commit()
- rcv = self.ssn.receiver(TX_ACK_Q)
+ rcv = self.ssn.receiver(TX_ACK_QD)
self.assertEmpty(rcv)
txssn.close()
self.assertEmpty(rcv)
@@ -405,7 +408,7 @@ class SessionTests(Base):
except Disconnected:
pass
-RECEIVER_Q = 'test-receiver-queue; {create: always}'
+RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
class ReceiverTests(Base):
@@ -482,28 +485,6 @@ class ReceiverTests(Base):
self.ssn.acknowledge()
- def testPending(self):
- self.rcv.capacity = UNLIMITED
- assert self.rcv.pending() == 0
-
- for i in range(3):
- self.send("testPending", i)
- self.sleep()
- assert self.rcv.pending() == 3
-
- for i in range(3, 10):
- self.send("testPending", i)
- self.sleep()
- assert self.rcv.pending() == 10
-
- self.drain(self.rcv, limit=3)
- assert self.rcv.pending() == 7
-
- self.drain(self.rcv)
- assert self.rcv.pending() == 0
-
- self.ssn.acknowledge()
-
def testCapacity(self):
self.rcv.capacity = 5
self.assertPending(self.rcv, 0)
@@ -537,8 +518,75 @@ class ReceiverTests(Base):
self.ssn.acknowledge()
+ def testPending(self):
+ self.rcv.capacity = UNLIMITED
+ assert self.rcv.pending() == 0
+
+ for i in range(3):
+ self.send("testPending", i)
+ self.sleep()
+ assert self.rcv.pending() == 3
+
+ for i in range(3, 10):
+ self.send("testPending", i)
+ self.sleep()
+ assert self.rcv.pending() == 10
+
+ self.drain(self.rcv, limit=3)
+ assert self.rcv.pending() == 7
+
+ self.drain(self.rcv)
+ assert self.rcv.pending() == 0
+
+ self.ssn.acknowledge()
+
# XXX: need testClose
+class AddressTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def testDeleteBySender(self):
+ snd = self.ssn.sender("test-delete; {create: always}")
+ snd.send("ping")
+ snd.close()
+ snd = self.ssn.sender("test-delete; {delete: always}")
+ snd.send("ping")
+ snd.close()
+ snd = self.ssn.sender("test-delete")
+ try:
+ snd.send("ping")
+ except SendError, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteByReceiver(self):
+ rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
+ try:
+ rcv.fetch(0)
+ except Empty:
+ pass
+ rcv.close()
+
+ try:
+ self.ssn.receiver("test-delete")
+ except SendError, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteSpecial(self):
+ snd = self.ssn.sender("amq.topic; {delete: always}")
+ snd.send("asdf")
+ try:
+ snd.close()
+ except SessionError, e:
+ assert "Cannot delete default exchange" in str(e)
+ # XXX: need to figure out close after error
+ self.conn._remove_session(self.ssn)
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
@@ -606,7 +654,7 @@ class AddressErrorTests(Base):
self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
lambda e: "unrecognized characters" in str(e))
-SENDER_Q = 'test-sender-q; {create: always}'
+SENDER_Q = 'test-sender-q; {create: always, delete: always}'
class SenderTests(Base):
@@ -715,7 +763,7 @@ class MessageTests(Base):
m.content = u"<html/>"
assert m.content_type == "text/html; charset=utf8"
-ECHO_Q = 'test-message-echo-queue; {create: always}'
+ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
class MessageEchoTests(Base):