summaryrefslogtreecommitdiff
path: root/python/qpid/tests/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests/messaging.py')
-rw-r--r--python/qpid/tests/messaging.py136
1 files changed, 92 insertions, 44 deletions
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index 8cadb2a8fe..474c9ab630 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -25,7 +25,8 @@ from qpid import compat
from qpid.tests import Test
from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
+ InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \
+ UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -66,7 +67,7 @@ class Base(Test):
return "%s[%s, %s]" % (base, count, self.test_id)
def ping(self, ssn):
- PING_Q = 'ping-queue; {create: always}'
+ PING_Q = 'ping-queue; {create: always, delete: always}'
# send a message
sender = ssn.sender(PING_Q, durable=self.durable())
content = self.content("ping")
@@ -173,7 +174,8 @@ class ConnectionTests(Base):
self.conn.close()
assert not self.conn.connected()
-ACK_Q = 'test-ack-queue; {create: always}'
+ACK_QC = 'test-ack-queue; {create: always}'
+ACK_QD = 'test-ack-queue; {delete: always}'
class SessionTests(Base):
@@ -185,7 +187,7 @@ class SessionTests(Base):
return self.conn.session()
def testSender(self):
- snd = self.ssn.sender('test-snd-queue; {create: always}',
+ snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
durable=self.durable())
snd2 = self.ssn.sender(snd.target, durable=self.durable())
assert snd is not snd2
@@ -199,7 +201,7 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testReceiver(self):
- rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
+ rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}')
rcv2 = self.ssn.receiver(rcv.source)
assert rcv is not rcv2
rcv2.close()
@@ -212,7 +214,7 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testNextReceiver(self):
- ADDR = 'test-next-rcv-queue; {create: always}'
+ ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
@@ -241,14 +243,14 @@ class SessionTests(Base):
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
- snd = self.ssn.sender(ACK_Q, durable=self.durable())
+ snd = self.ssn.sender(ACK_QC, durable=self.durable())
contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
- rcv = self.ssn.receiver(ACK_Q)
+ rcv = self.ssn.receiver(ACK_QC)
self.drain(rcv, expected=contents)
self.ssn.close()
@@ -257,7 +259,7 @@ class SessionTests(Base):
self.ssn = self.conn.session()
if ack_capacity is not None:
self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver(ACK_Q)
+ rcv = self.ssn.receiver(ACK_QC)
self.drain(rcv, expected=contents)
acker(self.ssn)
self.ssn.close()
@@ -265,7 +267,7 @@ class SessionTests(Base):
# drain the queue a final time and verify that the messages were
# dequeued
self.ssn = self.conn.session()
- rcv = self.ssn.receiver(ACK_Q)
+ rcv = self.ssn.receiver(ACK_QD)
self.assertEmpty(rcv)
def testAcknowledge(self):
@@ -283,7 +285,7 @@ class SessionTests(Base):
pass
finally:
self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver(ACK_Q))
+ self.drain(self.ssn.receiver(ACK_QD))
self.ssn.acknowledge()
def testAcknowledgeAsyncAckCap1(self):
@@ -306,8 +308,8 @@ class SessionTests(Base):
return contents
def txTest(self, commit):
- TX_Q = 'test-tx-queue; {create: always}'
- TX_Q_COPY = 'test-tx-queue-copy; {create: always}'
+ TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
+ TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
txssn = self.conn.session(transactional=True)
contents = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
@@ -337,7 +339,7 @@ class SessionTests(Base):
self.txTest(False)
def txTestSend(self, commit):
- TX_SEND_Q = 'test-tx-send-queue; {create: always}'
+ TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
txssn = self.conn.session(transactional=True)
contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
rcv = self.ssn.receiver(TX_SEND_Q)
@@ -360,11 +362,12 @@ class SessionTests(Base):
self.txTestSend(False)
def txTestAck(self, commit):
- TX_ACK_Q = 'test-tx-ack-queue; {create: always}'
+ TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
+ TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_Q)
+ txrcv = txssn.receiver(TX_ACK_QC)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
+ contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
assert contents == self.drain(txrcv)
if commit:
@@ -382,11 +385,11 @@ class SessionTests(Base):
txssn.close()
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_Q)
+ txrcv = txssn.receiver(TX_ACK_QC)
assert contents == self.drain(txrcv)
txssn.acknowledge()
txssn.commit()
- rcv = self.ssn.receiver(TX_ACK_Q)
+ rcv = self.ssn.receiver(TX_ACK_QD)
self.assertEmpty(rcv)
txssn.close()
self.assertEmpty(rcv)
@@ -405,7 +408,7 @@ class SessionTests(Base):
except Disconnected:
pass
-RECEIVER_Q = 'test-receiver-queue; {create: always}'
+RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
class ReceiverTests(Base):
@@ -482,28 +485,6 @@ class ReceiverTests(Base):
self.ssn.acknowledge()
- def testPending(self):
- self.rcv.capacity = UNLIMITED
- assert self.rcv.pending() == 0
-
- for i in range(3):
- self.send("testPending", i)
- self.sleep()
- assert self.rcv.pending() == 3
-
- for i in range(3, 10):
- self.send("testPending", i)
- self.sleep()
- assert self.rcv.pending() == 10
-
- self.drain(self.rcv, limit=3)
- assert self.rcv.pending() == 7
-
- self.drain(self.rcv)
- assert self.rcv.pending() == 0
-
- self.ssn.acknowledge()
-
def testCapacity(self):
self.rcv.capacity = 5
self.assertPending(self.rcv, 0)
@@ -537,8 +518,75 @@ class ReceiverTests(Base):
self.ssn.acknowledge()
+ def testPending(self):
+ self.rcv.capacity = UNLIMITED
+ assert self.rcv.pending() == 0
+
+ for i in range(3):
+ self.send("testPending", i)
+ self.sleep()
+ assert self.rcv.pending() == 3
+
+ for i in range(3, 10):
+ self.send("testPending", i)
+ self.sleep()
+ assert self.rcv.pending() == 10
+
+ self.drain(self.rcv, limit=3)
+ assert self.rcv.pending() == 7
+
+ self.drain(self.rcv)
+ assert self.rcv.pending() == 0
+
+ self.ssn.acknowledge()
+
# XXX: need testClose
+class AddressTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def testDeleteBySender(self):
+ snd = self.ssn.sender("test-delete; {create: always}")
+ snd.send("ping")
+ snd.close()
+ snd = self.ssn.sender("test-delete; {delete: always}")
+ snd.send("ping")
+ snd.close()
+ snd = self.ssn.sender("test-delete")
+ try:
+ snd.send("ping")
+ except SendError, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteByReceiver(self):
+ rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
+ try:
+ rcv.fetch(0)
+ except Empty:
+ pass
+ rcv.close()
+
+ try:
+ self.ssn.receiver("test-delete")
+ except SendError, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteSpecial(self):
+ snd = self.ssn.sender("amq.topic; {delete: always}")
+ snd.send("asdf")
+ try:
+ snd.close()
+ except SessionError, e:
+ assert "Cannot delete default exchange" in str(e)
+ # XXX: need to figure out close after error
+ self.conn._remove_session(self.ssn)
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
@@ -606,7 +654,7 @@ class AddressErrorTests(Base):
self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
lambda e: "unrecognized characters" in str(e))
-SENDER_Q = 'test-sender-q; {create: always}'
+SENDER_Q = 'test-sender-q; {create: always, delete: always}'
class SenderTests(Base):
@@ -715,7 +763,7 @@ class MessageTests(Base):
m.content = u"<html/>"
assert m.content_type == "text/html; charset=utf8"
-ECHO_Q = 'test-message-echo-queue; {create: always}'
+ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
class MessageEchoTests(Base):