summaryrefslogtreecommitdiff
path: root/trunk/qpid/python/qpid/tests/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/python/qpid/tests/messaging.py')
-rw-r--r--trunk/qpid/python/qpid/tests/messaging.py929
1 files changed, 0 insertions, 929 deletions
diff --git a/trunk/qpid/python/qpid/tests/messaging.py b/trunk/qpid/python/qpid/tests/messaging.py
deleted file mode 100644
index f2a270192e..0000000000
--- a/trunk/qpid/python/qpid/tests/messaging.py
+++ /dev/null
@@ -1,929 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# setup, usage, teardown, errors(sync), errors(async), stress, soak,
-# boundary-conditions, config
-
-import time
-from qpid import compat
-from qpid.tests import Test
-from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \
- UNLIMITED, uuid4
-from Queue import Queue, Empty as QueueEmpty
-
-class Base(Test):
-
- def setup_connection(self):
- return None
-
- def setup_session(self):
- return None
-
- def setup_sender(self):
- return None
-
- def setup_receiver(self):
- return None
-
- def setup(self):
- self.test_id = uuid4()
- self.broker = self.config.broker
- try:
- self.conn = self.setup_connection()
- except ConnectError, e:
- raise Skipped(e)
- self.ssn = self.setup_session()
- self.snd = self.setup_sender()
- if self.snd is not None:
- self.snd.durable = self.durable()
- self.rcv = self.setup_receiver()
-
- def teardown(self):
- if self.conn is not None and self.conn.connected():
- self.conn.close()
-
- def content(self, base, count = None):
- if count is None:
- return "%s[%s]" % (base, self.test_id)
- else:
- return "%s[%s, %s]" % (base, count, self.test_id)
-
- def ping(self, ssn):
- PING_Q = 'ping-queue; {create: always, delete: always}'
- # send a message
- sender = ssn.sender(PING_Q, durable=self.durable())
- content = self.content("ping")
- sender.send(content)
- receiver = ssn.receiver(PING_Q)
- msg = receiver.fetch(0)
- ssn.acknowledge()
- assert msg.content == content, "expected %r, got %r" % (content, msg.content)
-
- def drain(self, rcv, limit=None, timeout=0, expected=None):
- contents = []
- try:
- while limit is None or len(contents) < limit:
- contents.append(rcv.fetch(timeout=timeout).content)
- except Empty:
- pass
- if expected is not None:
- assert expected == contents, "expected %s, got %s" % (expected, contents)
- return contents
-
- def assertEmpty(self, rcv):
- contents = self.drain(rcv)
- assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
-
- def assertPending(self, rcv, expected):
- p = rcv.pending()
- assert p == expected, "expected %s, got %s" % (expected, p)
-
- def sleep(self):
- time.sleep(self.delay())
-
- def delay(self):
- return float(self.config.defines.get("delay", "2"))
-
- def get_bool(self, name):
- return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
-
- def durable(self):
- return self.get_bool("durable")
-
- def reconnect(self):
- return self.get_bool("reconnect")
-
-class SetupTests(Base):
-
- def testOpen(self):
- # XXX: need to flesh out URL support/syntax
- self.conn = Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
- self.ping(self.conn.session())
-
- def testConnect(self):
- # XXX: need to flesh out URL support/syntax
- self.conn = Connection(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
- self.conn.connect()
- self.ping(self.conn.session())
-
- def testConnectError(self):
- try:
- self.conn = Connection.open("localhost", 0)
- assert False, "connect succeeded"
- except ConnectError, e:
- # XXX: should verify that e includes appropriate diagnostic info
- pass
-
-class ConnectionTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def testSessionAnon(self):
- ssn1 = self.conn.session()
- ssn2 = self.conn.session()
- self.ping(ssn1)
- self.ping(ssn2)
- assert ssn1 is not ssn2
-
- def testSessionNamed(self):
- ssn1 = self.conn.session("one")
- ssn2 = self.conn.session("two")
- self.ping(ssn1)
- self.ping(ssn2)
- assert ssn1 is not ssn2
- assert ssn1 is self.conn.session("one")
- assert ssn2 is self.conn.session("two")
-
- def testDisconnect(self):
- ssn = self.conn.session()
- self.ping(ssn)
- self.conn.disconnect()
- try:
- self.ping(ssn)
- assert False, "ping succeeded"
- except Disconnected:
- # this is the expected failure when pinging on a disconnected
- # connection
- pass
- self.conn.connect()
- self.ping(ssn)
-
- def testClose(self):
- self.conn.close()
- assert not self.conn.connected()
-
-ACK_QC = 'test-ack-queue; {create: always}'
-ACK_QD = 'test-ack-queue; {delete: always}'
-
-class SessionTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def testSender(self):
- snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
- durable=self.durable())
- snd2 = self.ssn.sender(snd.target, durable=self.durable())
- assert snd is not snd2
- snd2.close()
-
- content = self.content("testSender")
- snd.send(content)
- rcv = self.ssn.receiver(snd.target)
- msg = rcv.fetch(0)
- assert msg.content == content
- self.ssn.acknowledge(msg)
-
- def testReceiver(self):
- rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
- rcv2 = self.ssn.receiver(rcv.source)
- assert rcv is not rcv2
- rcv2.close()
-
- content = self.content("testReceiver")
- snd = self.ssn.sender(rcv.source, durable=self.durable())
- snd.send(content)
- msg = rcv.fetch(0)
- assert msg.content == content
- self.ssn.acknowledge(msg)
- snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
-
- def testNextReceiver(self):
- ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
- rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
- rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
- rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
-
- snd = self.ssn.sender(ADDR)
-
- msgs = []
- for i in range(10):
- content = self.content("testNextReceiver", i)
- snd.send(content)
- msgs.append(content)
-
- fetched = []
- try:
- while True:
- rcv = self.ssn.next_receiver(timeout=self.delay())
- assert rcv in (rcv1, rcv2, rcv3)
- assert rcv.pending() > 0
- fetched.append(rcv.fetch().content)
- except Empty:
- pass
- assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
- self.ssn.acknowledge()
-
- # XXX, we need a convenient way to assert that required queues are
- # empty on setup, and possibly also to drain queues on teardown
- def ackTest(self, acker, ack_capacity=None):
- # send a bunch of messages
- snd = self.ssn.sender(ACK_QC, durable=self.durable())
- contents = [self.content("ackTest", i) for i in range(15)]
- for c in contents:
- snd.send(c)
-
- # drain the queue, verify the messages are there and then close
- # without acking
- rcv = self.ssn.receiver(ACK_QC)
- self.drain(rcv, expected=contents)
- self.ssn.close()
-
- # drain the queue again, verify that they are all the messages
- # were requeued, and ack this time before closing
- self.ssn = self.conn.session()
- if ack_capacity is not None:
- self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver(ACK_QC)
- self.drain(rcv, expected=contents)
- acker(self.ssn)
- self.ssn.close()
-
- # drain the queue a final time and verify that the messages were
- # dequeued
- self.ssn = self.conn.session()
- rcv = self.ssn.receiver(ACK_QD)
- self.assertEmpty(rcv)
-
- def testAcknowledge(self):
- self.ackTest(lambda ssn: ssn.acknowledge())
-
- def testAcknowledgeAsync(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
-
- def testAcknowledgeAsyncAckCap0(self):
- try:
- try:
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
- assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
- except InsufficientCapacity:
- pass
- finally:
- self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver(ACK_QD))
- self.ssn.acknowledge()
-
- def testAcknowledgeAsyncAckCap1(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
-
- def testAcknowledgeAsyncAckCap5(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
-
- def testAcknowledgeAsyncAckCapUNLIMITED(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
-
- def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue, durable=self.durable())
- contents = []
- for i in range(count):
- c = self.content(base, i)
- snd.send(c)
- contents.append(c)
- snd.close()
- return contents
-
- def txTest(self, commit):
- TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
- TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
- txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, TX_Q, "txTest", 3)
- txrcv = txssn.receiver(TX_Q)
- txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
- rcv = self.ssn.receiver(txrcv.source)
- copy_rcv = self.ssn.receiver(txsnd.target)
- self.assertEmpty(copy_rcv)
- for i in range(3):
- m = txrcv.fetch(0)
- txsnd.send(m)
- self.assertEmpty(copy_rcv)
- txssn.acknowledge()
- if commit:
- txssn.commit()
- self.assertEmpty(rcv)
- assert contents == self.drain(copy_rcv)
- else:
- txssn.rollback()
- assert contents == self.drain(rcv)
- self.assertEmpty(copy_rcv)
- self.ssn.acknowledge()
-
- def testCommit(self):
- self.txTest(True)
-
- def testRollback(self):
- self.txTest(False)
-
- def txTestSend(self, commit):
- TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
- txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
- rcv = self.ssn.receiver(TX_SEND_Q)
- self.assertEmpty(rcv)
-
- if commit:
- txssn.commit()
- assert contents == self.drain(rcv)
- self.ssn.acknowledge()
- else:
- txssn.rollback()
- self.assertEmpty(rcv)
- txssn.commit()
- self.assertEmpty(rcv)
-
- def testCommitSend(self):
- self.txTestSend(True)
-
- def testRollbackSend(self):
- self.txTestSend(False)
-
- def txTestAck(self, commit):
- TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
- TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
- txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_QC)
- self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- assert contents == self.drain(txrcv)
-
- if commit:
- txssn.acknowledge()
- else:
- txssn.rollback()
- drained = self.drain(txrcv)
- assert contents == drained, "expected %s, got %s" % (contents, drained)
- txssn.acknowledge()
- txssn.rollback()
- assert contents == self.drain(txrcv)
- txssn.commit() # commit without ack
- self.assertEmpty(txrcv)
-
- txssn.close()
-
- txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_QC)
- assert contents == self.drain(txrcv)
- txssn.acknowledge()
- txssn.commit()
- rcv = self.ssn.receiver(TX_ACK_QD)
- self.assertEmpty(rcv)
- txssn.close()
- self.assertEmpty(rcv)
-
- def testCommitAck(self):
- self.txTestAck(True)
-
- def testRollbackAck(self):
- self.txTestAck(False)
-
- def testClose(self):
- self.ssn.close()
- try:
- self.ping(self.ssn)
- assert False, "ping succeeded"
- except Disconnected:
- pass
-
-RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
-
-class ReceiverTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(RECEIVER_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(RECEIVER_Q)
-
- def send(self, base, count = None):
- content = self.content(base, count)
- self.snd.send(content)
- return content
-
- def testFetch(self):
- try:
- msg = self.rcv.fetch(0)
- assert False, "unexpected message: %s" % msg
- except Empty:
- pass
- try:
- start = time.time()
- msg = self.rcv.fetch(self.delay())
- assert False, "unexpected message: %s" % msg
- except Empty:
- elapsed = time.time() - start
- assert elapsed >= self.delay()
-
- one = self.send("testFetch", 1)
- two = self.send("testFetch", 2)
- three = self.send("testFetch", 3)
- msg = self.rcv.fetch(0)
- assert msg.content == one
- msg = self.rcv.fetch(self.delay())
- assert msg.content == two
- msg = self.rcv.fetch()
- assert msg.content == three
- self.ssn.acknowledge()
-
- def testCapacityIncrease(self):
- content = self.send("testCapacityIncrease")
- self.sleep()
- assert self.rcv.pending() == 0
- self.rcv.capacity = UNLIMITED
- self.sleep()
- assert self.rcv.pending() == 1
- msg = self.rcv.fetch(0)
- assert msg.content == content
- assert self.rcv.pending() == 0
- self.ssn.acknowledge()
-
- def testCapacityDecrease(self):
- self.rcv.capacity = UNLIMITED
- one = self.send("testCapacityDecrease", 1)
- self.sleep()
- assert self.rcv.pending() == 1
- msg = self.rcv.fetch(0)
- assert msg.content == one
-
- self.rcv.capacity = 0
-
- two = self.send("testCapacityDecrease", 2)
- self.sleep()
- assert self.rcv.pending() == 0
- msg = self.rcv.fetch(0)
- assert msg.content == two
-
- self.ssn.acknowledge()
-
- def testCapacity(self):
- self.rcv.capacity = 5
- self.assertPending(self.rcv, 0)
-
- for i in range(15):
- self.send("testCapacity", i)
- self.sleep()
- self.assertPending(self.rcv, 5)
-
- self.drain(self.rcv, limit = 5)
- self.sleep()
- self.assertPending(self.rcv, 5)
-
- drained = self.drain(self.rcv)
- assert len(drained) == 10, "%s, %s" % (len(drained), drained)
- self.assertPending(self.rcv, 0)
-
- self.ssn.acknowledge()
-
- def testCapacityUNLIMITED(self):
- self.rcv.capacity = UNLIMITED
- self.assertPending(self.rcv, 0)
-
- for i in range(10):
- self.send("testCapacityUNLIMITED", i)
- self.sleep()
- self.assertPending(self.rcv, 10)
-
- self.drain(self.rcv)
- self.assertPending(self.rcv, 0)
-
- self.ssn.acknowledge()
-
- def testPending(self):
- self.rcv.capacity = UNLIMITED
- assert self.rcv.pending() == 0
-
- for i in range(3):
- self.send("testPending", i)
- self.sleep()
- assert self.rcv.pending() == 3
-
- for i in range(3, 10):
- self.send("testPending", i)
- self.sleep()
- assert self.rcv.pending() == 10
-
- self.drain(self.rcv, limit=3)
- assert self.rcv.pending() == 7
-
- self.drain(self.rcv)
- assert self.rcv.pending() == 0
-
- self.ssn.acknowledge()
-
- # XXX: need testClose
-
-class AddressTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def testBadOption(self):
- snd = self.ssn.sender("test-bad-option; {create: always, node-properties: {this-property-does-not-exist: 3}}")
- try:
- snd.send("ping")
- except SendError, e:
- assert "unrecognized option" in str(e)
-
- def testCreateQueue(self):
- snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
- "node-properties: {type: queue, durable: False, "
- "x-properties: {auto_delete: true}}}")
- content = self.content("testCreateQueue")
- snd.send(content)
- rcv = self.ssn.receiver("test-create-queue")
- self.drain(rcv, expected=[content])
-
- def createExchangeTest(self, props=""):
- addr = """test-create-exchange; {
- create: always,
- delete: always,
- node-properties: {
- type: topic,
- durable: False,
- x-properties: {auto_delete: true, %s}
- }
- }""" % props
- snd = self.ssn.sender(addr)
- snd.send("ping")
- rcv1 = self.ssn.receiver("test-create-exchange/first")
- rcv2 = self.ssn.receiver("test-create-exchange/first")
- rcv3 = self.ssn.receiver("test-create-exchange/second")
- for r in (rcv1, rcv2, rcv3):
- try:
- r.fetch(0)
- assert False
- except Empty:
- pass
- msg1 = Message(self.content("testCreateExchange", 1), subject="first")
- msg2 = Message(self.content("testCreateExchange", 2), subject="second")
- snd.send(msg1)
- snd.send(msg2)
- self.drain(rcv1, expected=[msg1.content])
- self.drain(rcv2, expected=[msg1.content])
- self.drain(rcv3, expected=[msg2.content])
-
- def testCreateExchange(self):
- self.createExchangeTest()
-
- def testCreateExchangeDirect(self):
- self.createExchangeTest("type: direct")
-
- def testCreateExchangeTopic(self):
- self.createExchangeTest("type: topic")
-
- def testDeleteBySender(self):
- snd = self.ssn.sender("test-delete; {create: always}")
- snd.send("ping")
- snd.close()
- snd = self.ssn.sender("test-delete; {delete: always}")
- snd.send("ping")
- snd.close()
- snd = self.ssn.sender("test-delete")
- try:
- snd.send("ping")
- except SendError, e:
- assert "no such queue" in str(e)
-
- def testDeleteByReceiver(self):
- rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
- try:
- rcv.fetch(0)
- except Empty:
- pass
- rcv.close()
-
- try:
- self.ssn.receiver("test-delete")
- except SendError, e:
- assert "no such queue" in str(e)
-
- def testDeleteSpecial(self):
- snd = self.ssn.sender("amq.topic; {delete: always}")
- snd.send("asdf")
- try:
- snd.close()
- except SessionError, e:
- assert "Cannot delete default exchange" in str(e)
- # XXX: need to figure out close after error
- self.conn._remove_session(self.ssn)
-
- def testBindings(self):
- snd = self.ssn.sender("""
-test-bindings-queue; {
- create: always,
- delete: always,
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"]
- }
- }
-}
-""")
- snd.send("one")
- snd_a = self.ssn.sender("amq.topic/a.foo")
- snd_b = self.ssn.sender("amq.direct/b")
- snd_c = self.ssn.sender("amq.topic/c.bar")
- snd_a.send("two")
- snd_b.send("three")
- snd_c.send("four")
- rcv = self.ssn.receiver("test-bindings-queue")
- self.drain(rcv, expected=["one", "two", "three", "four"])
-
-NOSUCH_Q = "this-queue-should-not-exist"
-UNPARSEABLE_ADDR = "name/subject; {bad options"
-UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
-
-class AddressErrorTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def sendErrorTest(self, addr, exc, check=lambda e: True):
- snd = self.ssn.sender(addr, durable=self.durable())
- try:
- snd.send("hello")
- assert False, "send succeeded"
- except exc, e:
- assert check(e), "unexpected error: %s" % compat.format_exc(e)
- snd.close()
-
- def fetchErrorTest(self, addr, exc, check=lambda e: True):
- rcv = self.ssn.receiver(addr)
- try:
- rcv.fetch(timeout=0)
- assert False, "fetch succeeded"
- except exc, e:
- assert check(e), "unexpected error: %s" % compat.format_exc(e)
- rcv.close()
-
- def testNoneTarget(self):
- # XXX: should have specific exception for this
- self.sendErrorTest(None, SendError)
-
- def testNoneSource(self):
- # XXX: should have specific exception for this
- self.fetchErrorTest(None, ReceiveError)
-
- def testNoTarget(self):
- # XXX: should have specific exception for this
- self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
-
- def testNoSource(self):
- # XXX: should have specific exception for this
- self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
-
- def testUnparseableTarget(self):
- # XXX: should have specific exception for this
- self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
- lambda e: "expecting COLON" in str(e))
-
- def testUnparseableSource(self):
- # XXX: should have specific exception for this
- self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
- lambda e: "expecting COLON" in str(e))
-
- def testUnlexableTarget(self):
- # XXX: should have specific exception for this
- self.sendErrorTest(UNLEXABLE_ADDR, SendError,
- lambda e: "unrecognized characters" in str(e))
-
- def testUnlexableSource(self):
- # XXX: should have specific exception for this
- self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
- lambda e: "unrecognized characters" in str(e))
-
-SENDER_Q = 'test-sender-q; {create: always, delete: always}'
-
-class SenderTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(SENDER_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(SENDER_Q)
-
- def checkContent(self, content):
- self.snd.send(content)
- msg = self.rcv.fetch(0)
- assert msg.content == content
-
- out = Message(content)
- self.snd.send(out)
- echo = self.rcv.fetch(0)
- assert out.content == echo.content
- assert echo.content == msg.content
- self.ssn.acknowledge()
-
- def testSendString(self):
- self.checkContent(self.content("testSendString"))
-
- def testSendList(self):
- self.checkContent(["testSendList", 1, 3.14, self.test_id])
-
- def testSendMap(self):
- self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
-
- def asyncTest(self, capacity):
- self.snd.capacity = capacity
- msgs = [self.content("asyncTest", i) for i in range(15)]
- for m in msgs:
- self.snd.send(m, sync=False)
- drained = self.drain(self.rcv, timeout=self.delay())
- assert msgs == drained, "expected %s, got %s" % (msgs, drained)
- self.ssn.acknowledge()
-
- def testSendAsyncCapacity0(self):
- try:
- self.asyncTest(0)
- assert False, "send shouldn't succeed with zero capacity"
- except InsufficientCapacity:
- # this is expected
- pass
-
- def testSendAsyncCapacity1(self):
- self.asyncTest(1)
-
- def testSendAsyncCapacity5(self):
- self.asyncTest(5)
-
- def testSendAsyncCapacityUNLIMITED(self):
- self.asyncTest(UNLIMITED)
-
- def testCapacityTimeout(self):
- self.snd.capacity = 1
- msgs = []
- caught = False
- while len(msgs) < 100:
- m = self.content("testCapacity", len(msgs))
- try:
- self.snd.send(m, sync=False, timeout=0)
- msgs.append(m)
- except InsufficientCapacity:
- caught = True
- break
- self.snd.sync()
- self.drain(self.rcv, expected=msgs)
- self.ssn.acknowledge()
- assert caught, "did not exceed capacity"
-
-class MessageTests(Base):
-
- def testCreateString(self):
- m = Message("string")
- assert m.content == "string"
- assert m.content_type is None
-
- def testCreateUnicode(self):
- m = Message(u"unicode")
- assert m.content == u"unicode"
- assert m.content_type == "text/plain"
-
- def testCreateMap(self):
- m = Message({})
- assert m.content == {}
- assert m.content_type == "amqp/map"
-
- def testCreateList(self):
- m = Message([])
- assert m.content == []
- assert m.content_type == "amqp/list"
-
- def testContentTypeOverride(self):
- m = Message()
- m.content_type = "text/html; charset=utf8"
- m.content = u"<html/>"
- assert m.content_type == "text/html; charset=utf8"
-
-ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
-
-class MessageEchoTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(ECHO_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(ECHO_Q)
-
- def check(self, msg):
- self.snd.send(msg)
- echo = self.rcv.fetch(0)
-
- assert msg.id == echo.id
- assert msg.subject == echo.subject
- assert msg.user_id == echo.user_id
- assert msg.to == echo.to
- assert msg.reply_to == echo.reply_to
- assert msg.correlation_id == echo.correlation_id
- assert msg.properties == echo.properties
- assert msg.content_type == echo.content_type
- assert msg.content == echo.content, "%s, %s" % (msg, echo)
-
- self.ssn.acknowledge(echo)
-
- def testStringContent(self):
- self.check(Message("string"))
-
- def testUnicodeContent(self):
- self.check(Message(u"unicode"))
-
-
- TEST_MAP = {"key1": "string",
- "key2": u"unicode",
- "key3": 3,
- "key4": -3,
- "key5": 3.14,
- "key6": -3.14,
- "key7": ["one", 2, 3.14],
- "key8": [],
- "key9": {"sub-key0": 3}}
-
- def testMapContent(self):
- self.check(Message(MessageEchoTests.TEST_MAP))
-
- def testListContent(self):
- self.check(Message([]))
- self.check(Message([1, 2, 3]))
- self.check(Message(["one", 2, 3.14, {"four": 4}]))
-
- def testProperties(self):
- msg = Message()
- msg.to = "to-address"
- msg.subject = "subject"
- msg.correlation_id = str(self.test_id)
- msg.properties = MessageEchoTests.TEST_MAP
- msg.reply_to = "reply-address"
- self.check(msg)
-
-class TestTestsXXX(Test):
-
- def testFoo(self):
- print "this test has output"
-
- def testBar(self):
- print "this test "*8
- print "has"*10
- print "a"*75
- print "lot of"*10
- print "output"*10
-
- def testQux(self):
- import sys
- sys.stdout.write("this test has output with no newline")
-
- def testQuxFail(self):
- import sys
- sys.stdout.write("this test has output with no newline")
- fdsa