summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/tests/messaging/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/tests/messaging/endpoints.py')
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py1335
1 files changed, 1335 insertions, 0 deletions
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
new file mode 100644
index 0000000000..db5ec03df2
--- /dev/null
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -0,0 +1,1335 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+import errno, os, socket, sys, time
+from qpid import compat
+from qpid.compat import set
+from qpid.messaging import *
+from qpid.messaging.transports import TRANSPORTS
+from qpid.tests.messaging import Base
+from threading import Thread
+
+class SetupTests(Base):
+
+ def testEstablish(self):
+ self.conn = Connection.establish(self.broker, **self.connection_options())
+ self.ping(self.conn.session())
+
+ def testOpen(self):
+ self.conn = Connection(self.broker, **self.connection_options())
+ self.conn.open()
+ self.ping(self.conn.session())
+
+ def testOpenReconnectURLs(self):
+ options = self.connection_options()
+ options["reconnect_urls"] = [self.broker, self.broker]
+ self.conn = Connection(self.broker, **options)
+ self.conn.open()
+ self.ping(self.conn.session())
+
+ def testTcpNodelay(self):
+ self.conn = Connection.establish(self.broker, tcp_nodelay=True)
+ assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+
+ def testConnectError(self):
+ try:
+ # Specifying port 0 yields a bad address on Windows; port 4 is unassigned
+ self.conn = Connection.establish("localhost:4")
+ assert False, "connect succeeded"
+ except ConnectError, e:
+ assert "refused" in str(e)
+
+ def testGetError(self):
+ self.conn = Connection("localhost:0")
+ try:
+ self.conn.open()
+ assert False, "connect succeeded"
+ except ConnectError, e:
+ assert self.conn.get_error() == e
+
+ def use_fds(self):
+ fds = []
+ try:
+ while True:
+ fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY))
+ except OSError, e:
+ if e.errno != errno.EMFILE:
+ raise e
+ else:
+ return fds
+
+ def testOpenCloseResourceLeaks(self):
+ fds = self.use_fds()
+ try:
+ for i in range(32):
+ if fds: os.close(fds.pop())
+ for i in xrange(64):
+ conn = Connection.establish(self.broker, **self.connection_options())
+ conn.close()
+ finally:
+ while fds:
+ os.close(fds.pop())
+
+ def testOpenFailResourceLeaks(self):
+ fds = self.use_fds()
+ try:
+ for i in range(32):
+ if fds: os.close(fds.pop())
+ for i in xrange(64):
+ conn = Connection("localhost:0", **self.connection_options())
+ # XXX: we need to force a waiter to be created for this test
+ # to work
+ conn._lock.acquire()
+ conn._wait(lambda: False, timeout=0.001)
+ conn._lock.release()
+ try:
+ conn.open()
+ except ConnectError, e:
+ pass
+ finally:
+ while fds:
+ os.close(fds.pop())
+
+ def testReconnect(self):
+ options = self.connection_options()
+ real = TRANSPORTS["tcp"]
+
+ class flaky:
+
+ def __init__(self, conn, host, port):
+ self.real = real(conn, host, port)
+ self.sent_count = 0
+ self.recv_count = 0
+
+ def fileno(self):
+ return self.real.fileno()
+
+ def reading(self, reading):
+ return self.real.reading(reading)
+
+ def writing(self, writing):
+ return self.real.writing(writing)
+
+ def send(self, bytes):
+ if self.sent_count > 2048:
+ raise socket.error("fake error")
+ n = self.real.send(bytes)
+ self.sent_count += n
+ return n
+
+ def recv(self, n):
+ if self.recv_count > 2048:
+ return ""
+ bytes = self.real.recv(n)
+ self.recv_count += len(bytes)
+ return bytes
+
+ def close(self):
+ self.real.close()
+
+ TRANSPORTS["flaky"] = flaky
+
+ options["reconnect"] = True
+ options["reconnect_interval"] = 0
+ options["reconnect_limit"] = 100
+ options["reconnect_log"] = False
+ options["transport"] = "flaky"
+
+ self.conn = Connection.establish(self.broker, **options)
+ ssn = self.conn.session()
+ snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
+ rcv = ssn.receiver(snd.target)
+
+ msgs = [self.message("testReconnect", i) for i in range(20)]
+ for m in msgs:
+ snd.send(m)
+
+ content = set()
+ drained = []
+ duplicates = []
+ try:
+ while True:
+ m = rcv.fetch(timeout=0)
+ if m.content not in content:
+ content.add(m.content)
+ drained.append(m)
+ else:
+ duplicates.append(m)
+ ssn.acknowledge(m)
+ except Empty:
+ pass
+ # XXX: apparently we don't always get duplicates, should figure out why
+ #assert duplicates, "no duplicates"
+ assert len(drained) == len(msgs)
+ for m, d in zip(msgs, drained):
+ # XXX: we should figure out how to provide proper end to end
+ # redelivered
+ self.assertEcho(m, d, d.redelivered)
+
+class ConnectionTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def testCheckClosed(self):
+ assert not self.conn.check_closed()
+
+ def testSessionAnon(self):
+ ssn1 = self.conn.session()
+ ssn2 = self.conn.session()
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+
+ def testSessionNamed(self):
+ ssn1 = self.conn.session("one")
+ ssn2 = self.conn.session("two")
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+ assert ssn1 is self.conn.session("one")
+ assert ssn2 is self.conn.session("two")
+
+ def testDetach(self):
+ ssn = self.conn.session()
+ self.ping(ssn)
+ self.conn.detach()
+ try:
+ self.ping(ssn)
+ assert False, "ping succeeded"
+ except Detached:
+ # this is the expected failure when pinging on a detached
+ # connection
+ pass
+ self.conn.attach()
+ self.ping(ssn)
+
+ def testClose(self):
+ self.conn.close()
+ assert not self.conn.attached()
+
+ def testSimultaneousClose(self):
+ ssns = [self.conn.session() for i in range(3)]
+ for s in ssns:
+ for i in range(3):
+ s.receiver("amq.topic")
+ s.sender("amq.topic")
+
+ def closer(errors):
+ try:
+ self.conn.close()
+ except:
+ _, e, _ = sys.exc_info()
+ errors.append(compat.format_exc(e))
+
+ t1_errors = []
+ t2_errors = []
+ t1 = Thread(target=lambda: closer(t1_errors))
+ t2 = Thread(target=lambda: closer(t2_errors))
+ t1.start()
+ t2.start()
+ t1.join(self.delay())
+ t2.join(self.delay())
+
+ assert not t1_errors, t1_errors[0]
+ assert not t2_errors, t2_errors[0]
+
+class hangable:
+
+ def __init__(self, conn, host, port):
+ self.tcp = TRANSPORTS["tcp"](conn, host, port)
+ self.hung = False
+
+ def hang(self):
+ self.hung = True
+
+ def fileno(self):
+ return self.tcp.fileno()
+
+ def reading(self, reading):
+ if self.hung:
+ return True
+ else:
+ return self.tcp.reading(reading)
+
+ def writing(self, writing):
+ if self.hung:
+ return False
+ else:
+ return self.tcp.writing(writing)
+
+ def send(self, bytes):
+ if self.hung:
+ return 0
+ else:
+ return self.tcp.send(bytes)
+
+ def recv(self, n):
+ if self.hung:
+ return ""
+ else:
+ return self.tcp.recv(n)
+
+ def close(self):
+ self.tcp.close()
+
+TRANSPORTS["hangable"] = hangable
+
+class TimeoutTests(Base):
+
+ def setup_connection(self):
+ options = self.connection_options()
+ options["transport"] = "hangable"
+ return Connection.establish(self.broker, **options)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender("amq.topic")
+
+ def setup_receiver(self):
+ return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}")
+
+ def teardown_connection(self, conn):
+ try:
+ conn.detach(timeout=0)
+ except Timeout:
+ pass
+
+ def hang(self):
+ self.conn._driver._transport.hang()
+
+ def timeoutTest(self, method):
+ self.hang()
+ try:
+ method(timeout=self.delay())
+ assert False, "did not time out"
+ except Timeout:
+ pass
+
+ def testSenderSync(self):
+ self.snd.send(self.content("testSenderSync"), sync=False)
+ self.timeoutTest(self.snd.sync)
+
+ def testSenderClose(self):
+ self.snd.send(self.content("testSenderClose"), sync=False)
+ self.timeoutTest(self.snd.close)
+
+ def testReceiverClose(self):
+ self.timeoutTest(self.rcv.close)
+
+ def testSessionSync(self):
+ self.snd.send(self.content("testSessionSync"), sync=False)
+ self.timeoutTest(self.ssn.sync)
+
+ def testSessionClose(self):
+ self.timeoutTest(self.ssn.close)
+
+ def testConnectionDetach(self):
+ self.timeoutTest(self.conn.detach)
+
+ def testConnectionClose(self):
+ self.timeoutTest(self.conn.close)
+
+ACK_QC = 'test-ack-queue; {create: always}'
+ACK_QD = 'test-ack-queue; {delete: always}'
+
+class SessionTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def testSender(self):
+ snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
+ durable=self.durable())
+ snd2 = self.ssn.sender(snd.target, durable=self.durable())
+ assert snd is not snd2
+ snd2.close()
+
+ content = self.content("testSender")
+ snd.send(content)
+ rcv = self.ssn.receiver(snd.target)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+
+ def testReceiver(self):
+ rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
+ rcv2 = self.ssn.receiver(rcv.source)
+ assert rcv is not rcv2
+ rcv2.close()
+
+ content = self.content("testReceiver")
+ snd = self.ssn.sender(rcv.source, durable=self.durable())
+ snd.send(content)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+ snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
+
+ def testDetachedReceiver(self):
+ self.conn.detach()
+ rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
+ m = self.content("testDetachedReceiver")
+ self.conn.attach()
+ snd = self.ssn.sender("test-dis-rcv-queue")
+ snd.send(m)
+ self.drain(rcv, expected=[m])
+
+ def testNextReceiver(self):
+ ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
+ rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+
+ snd = self.ssn.sender(ADDR)
+
+ msgs = []
+ for i in range(10):
+ content = self.content("testNextReceiver", i)
+ snd.send(content)
+ msgs.append(content)
+
+ fetched = []
+ try:
+ while True:
+ rcv = self.ssn.next_receiver(timeout=self.delay())
+ assert rcv in (rcv1, rcv2, rcv3)
+ assert rcv.available() > 0
+ fetched.append(rcv.fetch().content)
+ except Empty:
+ pass
+ assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
+ self.ssn.acknowledge()
+ #we set the capacity to 0 to prevent the deletion of the queue -
+ #triggered the deletion policy when the first receiver is closed -
+ #resulting in session exceptions being issued for the remaining
+ #active subscriptions:
+ for r in [rcv1, rcv2, rcv3]:
+ r.capacity = 0
+
+ # XXX, we need a convenient way to assert that required queues are
+ # empty on setup, and possibly also to drain queues on teardown
+ def ackTest(self, acker, ack_capacity=None):
+ # send a bunch of messages
+ snd = self.ssn.sender(ACK_QC, durable=self.durable())
+ contents = [self.content("ackTest", i) for i in range(15)]
+ for c in contents:
+ snd.send(c)
+
+ # drain the queue, verify the messages are there and then close
+ # without acking
+ rcv = self.ssn.receiver(ACK_QC)
+ self.drain(rcv, expected=contents)
+ self.ssn.close()
+
+ # drain the queue again, verify that they are all the messages
+ # were requeued, and ack this time before closing
+ self.ssn = self.conn.session()
+ if ack_capacity is not None:
+ self.ssn.ack_capacity = ack_capacity
+ rcv = self.ssn.receiver(ACK_QC)
+ self.drain(rcv, expected=contents)
+ acker(self.ssn)
+ self.ssn.close()
+
+ # drain the queue a final time and verify that the messages were
+ # dequeued
+ self.ssn = self.conn.session()
+ rcv = self.ssn.receiver(ACK_QD)
+ self.assertEmpty(rcv)
+
+ def testAcknowledge(self):
+ self.ackTest(lambda ssn: ssn.acknowledge())
+
+ def testAcknowledgeAsync(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+ def testAcknowledgeAsyncAckCap0(self):
+ try:
+ try:
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+ assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+ except InsufficientCapacity:
+ pass
+ finally:
+ self.ssn.ack_capacity = UNLIMITED
+ self.drain(self.ssn.receiver(ACK_QD))
+ self.ssn.acknowledge()
+
+ def testAcknowledgeAsyncAckCap1(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+ def testAcknowledgeAsyncAckCap5(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+ def testAcknowledgeAsyncAckCapUNLIMITED(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
+ def testRelease(self):
+ msgs = [self.message("testRelease", i) for i in range(3)]
+ snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+ self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+ self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+ self.drain(rcv, expected=msgs[2:3])
+ self.ssn.acknowledge()
+
+ def testReject(self):
+ msgs = [self.message("testReject", i) for i in range(3)]
+ snd = self.ssn.sender("""
+ test-reject-queue; {
+ create: always,
+ delete: always,
+ node: {
+ x-declare: {
+ alternate-exchange: 'amq.topic'
+ }
+ }
+ }
+""")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ rej = self.ssn.receiver("amq.topic")
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+ self.ssn.acknowledge(echos[2],
+ Disposition(REJECTED, code=3, text="test-reject"))
+ self.drain(rej, expected=msgs[1:])
+ self.ssn.acknowledge()
+
+ def send(self, ssn, target, base, count=1):
+ snd = ssn.sender(target, durable=self.durable())
+ messages = []
+ for i in range(count):
+ c = self.message(base, i)
+ snd.send(c)
+ messages.append(c)
+ snd.close()
+ return messages
+
+ def txTest(self, commit):
+ TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
+ TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
+ txssn = self.conn.session(transactional=True)
+ messages = self.send(self.ssn, TX_Q, "txTest", 3)
+ txrcv = txssn.receiver(TX_Q)
+ txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
+ rcv = self.ssn.receiver(txrcv.source)
+ copy_rcv = self.ssn.receiver(txsnd.target)
+ self.assertEmpty(copy_rcv)
+ for i in range(3):
+ m = txrcv.fetch(0)
+ txsnd.send(m)
+ self.assertEmpty(copy_rcv)
+ txssn.acknowledge()
+ if commit:
+ txssn.commit()
+ self.assertEmpty(rcv)
+ self.drain(copy_rcv, expected=messages)
+ else:
+ txssn.rollback()
+ self.drain(rcv, expected=messages, redelivered=True)
+ self.assertEmpty(copy_rcv)
+ self.ssn.acknowledge()
+
+ def testCommit(self):
+ self.txTest(True)
+
+ def testRollback(self):
+ self.txTest(False)
+
+ def txTestSend(self, commit):
+ TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
+ txssn = self.conn.session(transactional=True)
+ messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ rcv = self.ssn.receiver(TX_SEND_Q)
+ self.assertEmpty(rcv)
+
+ if commit:
+ txssn.commit()
+ self.drain(rcv, expected=messages)
+ self.ssn.acknowledge()
+ else:
+ txssn.rollback()
+ self.assertEmpty(rcv)
+ txssn.commit()
+ self.assertEmpty(rcv)
+
+ def testCommitSend(self):
+ self.txTestSend(True)
+
+ def testRollbackSend(self):
+ self.txTestSend(False)
+
+ def txTestAck(self, commit):
+ TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
+ TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
+ txssn = self.conn.session(transactional=True)
+ txrcv = txssn.receiver(TX_ACK_QC)
+ self.assertEmpty(txrcv)
+ messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ self.drain(txrcv, expected=messages)
+
+ if commit:
+ txssn.acknowledge()
+ else:
+ txssn.rollback()
+ self.drain(txrcv, expected=messages, redelivered=True)
+ txssn.acknowledge()
+ txssn.rollback()
+ self.drain(txrcv, expected=messages, redelivered=True)
+ txssn.commit() # commit without ack
+ self.assertEmpty(txrcv)
+
+ txssn.close()
+
+ txssn = self.conn.session(transactional=True)
+ txrcv = txssn.receiver(TX_ACK_QC)
+ self.drain(txrcv, expected=messages, redelivered=True)
+ txssn.acknowledge()
+ txssn.commit()
+ rcv = self.ssn.receiver(TX_ACK_QD)
+ self.assertEmpty(rcv)
+ txssn.close()
+ self.assertEmpty(rcv)
+
+ def testCommitAck(self):
+ self.txTestAck(True)
+
+ def testRollbackAck(self):
+ self.txTestAck(False)
+
+ def testDoubleCommit(self):
+ ssn = self.conn.session(transactional=True)
+ snd = ssn.sender("amq.direct")
+ rcv = ssn.receiver("amq.direct")
+ msgs = [self.message("testDoubleCommit", i) for i in range(3)]
+ for m in msgs:
+ snd.send(m)
+ ssn.commit()
+ self.drain(rcv, expected=msgs)
+ ssn.acknowledge()
+ ssn.commit()
+
+ def testClose(self):
+ self.ssn.close()
+ try:
+ self.ping(self.ssn)
+ assert False, "ping succeeded"
+ except Detached:
+ pass
+
+RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
+
+class ReceiverTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(RECEIVER_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(RECEIVER_Q)
+
+ def send(self, base, count = None, sync=True):
+ content = self.content(base, count)
+ self.snd.send(content, sync=sync)
+ return content
+
+ def testFetch(self):
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ pass
+ try:
+ start = time.time()
+ msg = self.rcv.fetch(self.delay())
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ elapsed = time.time() - start
+ assert elapsed >= self.delay()
+
+ one = self.send("testFetch", 1)
+ two = self.send("testFetch", 2)
+ three = self.send("testFetch", 3)
+ msg = self.rcv.fetch(0)
+ assert msg.content == one
+ msg = self.rcv.fetch(self.delay())
+ assert msg.content == two
+ msg = self.rcv.fetch()
+ assert msg.content == three
+ self.ssn.acknowledge()
+
+ def fetchFromClosedTest(self, entry):
+ entry.close()
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected result: %s" % msg
+ except Empty, e:
+ assert False, "unexpected exception: %s" % e
+ except LinkClosed, e:
+ pass
+
+ def testFetchFromClosedReceiver(self):
+ self.fetchFromClosedTest(self.rcv)
+
+ def testFetchFromClosedSession(self):
+ self.fetchFromClosedTest(self.ssn)
+
+ def testFetchFromClosedConnection(self):
+ self.fetchFromClosedTest(self.conn)
+
+ def fetchFromConcurrentCloseTest(self, entry):
+ def closer():
+ self.sleep()
+ entry.close()
+ t = Thread(target=closer)
+ t.start()
+ try:
+ msg = self.rcv.fetch()
+ assert False, "unexpected result: %s" % msg
+ except Empty, e:
+ assert False, "unexpected exception: %s" % e
+ except LinkClosed, e:
+ pass
+ t.join()
+
+ def testFetchFromConcurrentCloseReceiver(self):
+ self.fetchFromConcurrentCloseTest(self.rcv)
+
+ def testFetchFromConcurrentCloseSession(self):
+ self.fetchFromConcurrentCloseTest(self.ssn)
+
+ def testFetchFromConcurrentCloseConnection(self):
+ self.fetchFromConcurrentCloseTest(self.conn)
+
+ def testCapacityIncrease(self):
+ content = self.send("testCapacityIncrease")
+ self.sleep()
+ assert self.rcv.available() == 0
+ self.rcv.capacity = UNLIMITED
+ self.sleep()
+ assert self.rcv.available() == 1
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+ assert self.rcv.available() == 0
+ self.ssn.acknowledge()
+
+ def testCapacityDecrease(self):
+ self.rcv.capacity = UNLIMITED
+ one = self.send("testCapacityDecrease", 1)
+ self.sleep()
+ assert self.rcv.available() == 1
+ msg = self.rcv.fetch(0)
+ assert msg.content == one
+
+ self.rcv.capacity = 0
+
+ two = self.send("testCapacityDecrease", 2)
+ self.sleep()
+ assert self.rcv.available() == 0
+ msg = self.rcv.fetch(0)
+ assert msg.content == two
+
+ self.ssn.acknowledge()
+
+ def capacityTest(self, capacity, threshold=None):
+ if threshold is not None:
+ self.rcv.threshold = threshold
+ self.rcv.capacity = capacity
+ self.assertAvailable(self.rcv, 0)
+
+ for i in range(2*capacity):
+ self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False)
+ self.snd.sync()
+ self.sleep()
+ self.assertAvailable(self.rcv)
+
+ first = capacity/2
+ second = capacity - first
+ self.drain(self.rcv, limit = first)
+ self.sleep()
+ self.assertAvailable(self.rcv)
+ self.drain(self.rcv, limit = second)
+ self.sleep()
+ self.assertAvailable(self.rcv)
+
+ drained = self.drain(self.rcv)
+ assert len(drained) == capacity, "%s, %s" % (len(drained), drained)
+ self.assertAvailable(self.rcv, 0)
+
+ self.ssn.acknowledge()
+
+ def testCapacity5(self):
+ self.capacityTest(5)
+
+ def testCapacity5Threshold1(self):
+ self.capacityTest(5, 1)
+
+ def testCapacity10(self):
+ self.capacityTest(10)
+
+ def testCapacity10Threshold1(self):
+ self.capacityTest(10, 1)
+
+ def testCapacity100(self):
+ self.capacityTest(100)
+
+ def testCapacity100Threshold1(self):
+ self.capacityTest(100, 1)
+
+ def testCapacityUNLIMITED(self):
+ self.rcv.capacity = UNLIMITED
+ self.assertAvailable(self.rcv, 0)
+
+ for i in range(10):
+ self.send("testCapacityUNLIMITED", i)
+ self.sleep()
+ self.assertAvailable(self.rcv, 10)
+
+ self.drain(self.rcv)
+ self.assertAvailable(self.rcv, 0)
+
+ self.ssn.acknowledge()
+
+ def testAvailable(self):
+ self.rcv.capacity = UNLIMITED
+ assert self.rcv.available() == 0
+
+ for i in range(3):
+ self.send("testAvailable", i)
+ self.sleep()
+ assert self.rcv.available() == 3
+
+ for i in range(3, 10):
+ self.send("testAvailable", i)
+ self.sleep()
+ assert self.rcv.available() == 10
+
+ self.drain(self.rcv, limit=3)
+ assert self.rcv.available() == 7
+
+ self.drain(self.rcv)
+ assert self.rcv.available() == 0
+
+ self.ssn.acknowledge()
+
+ def testDoubleClose(self):
+ m1 = self.content("testDoubleClose", 1)
+ m2 = self.content("testDoubleClose", 2)
+
+ snd = self.ssn.sender("""test-double-close; {
+ create: always,
+ delete: sender,
+ node: {
+ type: topic
+ }
+}
+""")
+ r1 = self.ssn.receiver(snd.target)
+ r2 = self.ssn.receiver(snd.target)
+ snd.send(m1)
+ self.drain(r1, expected=[m1])
+ self.drain(r2, expected=[m1])
+ r1.close()
+ snd.send(m2)
+ self.drain(r2, expected=[m2])
+ r2.close()
+
+ # XXX: need testClose
+
+ def testMode(self):
+ msgs = [self.content("testMode", 1),
+ self.content("testMode", 2),
+ self.content("testMode", 3)]
+
+ for m in msgs:
+ self.snd.send(m)
+
+ rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
+ rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
+ self.drain(rb, expected=msgs)
+ self.drain(rc, expected=msgs)
+ rb2 = self.ssn.receiver(rb.source)
+ self.assertEmpty(rb2)
+ self.drain(self.rcv, expected=[])
+
+ # XXX: need testUnsettled()
+
+ def unreliabilityTest(self, mode="unreliable"):
+ msgs = [self.message("testUnreliable", i) for i in range(3)]
+ snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}")
+ rcv = self.ssn.receiver(snd.target)
+ for m in msgs:
+ snd.send(m)
+
+ # close without ack on reliable receiver, messages should be requeued
+ ssn = self.conn.session()
+ rrcv = ssn.receiver("test-unreliability-queue")
+ self.drain(rrcv, expected=msgs)
+ ssn.close()
+
+ # close without ack on unreliable receiver, messages should not be requeued
+ ssn = self.conn.session()
+ urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode)
+ self.drain(urcv, expected=msgs, redelivered=True)
+ ssn.close()
+
+ self.assertEmpty(rcv)
+
+ def testUnreliable(self):
+ self.unreliabilityTest(mode="unreliable")
+
+ def testAtMostOnce(self):
+ self.unreliabilityTest(mode="at-most-once")
+
+class AddressTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def badOption(self, options, error):
+ try:
+ self.ssn.sender("test-bad-options-snd; %s" % options)
+ assert False
+ except InvalidOption, e:
+ assert "error in options: %s" % error == str(e), e
+
+ try:
+ self.ssn.receiver("test-bad-options-rcv; %s" % options)
+ assert False
+ except InvalidOption, e:
+ assert "error in options: %s" % error == str(e), e
+
+ def testIllegalKey(self):
+ self.badOption("{create: always, node: "
+ "{this-property-does-not-exist: 3}}",
+ "node: this-property-does-not-exist: "
+ "illegal key")
+
+ def testWrongValue(self):
+ self.badOption("{create: asdf}", "create: asdf not in "
+ "('always', 'sender', 'receiver', 'never')")
+
+ def testWrongType1(self):
+ self.badOption("{node: asdf}",
+ "node: asdf is not a map")
+
+ def testWrongType2(self):
+ self.badOption("{node: {durable: []}}",
+ "node: durable: [] is not a bool")
+
+ def testCreateQueue(self):
+ snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
+ "node: {type: queue, durable: False, "
+ "x-declare: {auto_delete: true}}}")
+ content = self.content("testCreateQueue")
+ snd.send(content)
+ rcv = self.ssn.receiver("test-create-queue")
+ self.drain(rcv, expected=[content])
+
+ def createExchangeTest(self, props=""):
+ addr = """test-create-exchange; {
+ create: always,
+ delete: always,
+ node: {
+ type: topic,
+ durable: False,
+ x-declare: {auto_delete: true, %s}
+ }
+ }""" % props
+ snd = self.ssn.sender(addr)
+ snd.send("ping")
+ rcv1 = self.ssn.receiver("test-create-exchange/first")
+ rcv2 = self.ssn.receiver("test-create-exchange/first")
+ rcv3 = self.ssn.receiver("test-create-exchange/second")
+ for r in (rcv1, rcv2, rcv3):
+ try:
+ r.fetch(0)
+ assert False
+ except Empty:
+ pass
+ msg1 = Message(self.content("testCreateExchange", 1), subject="first")
+ msg2 = Message(self.content("testCreateExchange", 2), subject="second")
+ snd.send(msg1)
+ snd.send(msg2)
+ self.drain(rcv1, expected=[msg1.content])
+ self.drain(rcv2, expected=[msg1.content])
+ self.drain(rcv3, expected=[msg2.content])
+
+ def testCreateExchange(self):
+ self.createExchangeTest()
+
+ def testCreateExchangeDirect(self):
+ self.createExchangeTest("type: direct")
+
+ def testCreateExchangeTopic(self):
+ self.createExchangeTest("type: topic")
+
+ def testDeleteBySender(self):
+ snd = self.ssn.sender("test-delete; {create: always}")
+ snd.send("ping")
+ snd.close()
+ snd = self.ssn.sender("test-delete; {delete: always}")
+ snd.send("ping")
+ snd.close()
+ try:
+ self.ssn.sender("test-delete")
+ except NotFound, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteByReceiver(self):
+ rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
+ try:
+ rcv.fetch(0)
+ except Empty:
+ pass
+ rcv.close()
+
+ try:
+ self.ssn.receiver("test-delete")
+ assert False
+ except NotFound, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteSpecial(self):
+ snd = self.ssn.sender("amq.topic; {delete: always}")
+ snd.send("asdf")
+ try:
+ snd.close()
+ assert False, "successfully deleted amq.topic"
+ except SessionError, e:
+ assert "Cannot delete default exchange" in str(e)
+ # XXX: need to figure out close after error
+ self.conn._remove_session(self.ssn)
+
+ def testNodeBindingsQueue(self):
+ snd = self.ssn.sender("""
+test-node-bindings-queue; {
+ create: always,
+ delete: always,
+ node: {
+ x-bindings: [{exchange: "amq.topic", key: "a.#"},
+ {exchange: "amq.direct", key: "b"},
+ {exchange: "amq.topic", key: "c.*"}]
+ }
+}
+""")
+ snd.send("one")
+ snd_a = self.ssn.sender("amq.topic/a.foo")
+ snd_b = self.ssn.sender("amq.direct/b")
+ snd_c = self.ssn.sender("amq.topic/c.bar")
+ snd_a.send("two")
+ snd_b.send("three")
+ snd_c.send("four")
+ rcv = self.ssn.receiver("test-node-bindings-queue")
+ self.drain(rcv, expected=["one", "two", "three", "four"])
+
+ def testNodeBindingsTopic(self):
+ rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
+ rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
+ rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
+ rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
+ snd = self.ssn.sender("""
+test-node-bindings-topic; {
+ create: always,
+ delete: always,
+ node: {
+ type: topic,
+ x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
+ {queue: test-node-bindings-topic-queue-a, key: "a.#"},
+ {queue: test-node-bindings-topic-queue-b, key: "b"},
+ {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
+ }
+}
+""")
+ m1 = Message("one")
+ m2 = Message(subject="a.foo", content="two")
+ m3 = Message(subject="b", content="three")
+ m4 = Message(subject="c.bar", content="four")
+ snd.send(m1)
+ snd.send(m2)
+ snd.send(m3)
+ snd.send(m4)
+ self.drain(rcv, expected=[m1, m2, m3, m4])
+ self.drain(rcv_a, expected=[m2])
+ self.drain(rcv_b, expected=[m3])
+ self.drain(rcv_c, expected=[m4])
+
+ def testLinkBindings(self):
+ m_a = self.message("testLinkBindings", 1, subject="a")
+ m_b = self.message("testLinkBindings", 2, subject="b")
+
+ self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
+ snd = self.ssn.sender("amq.topic")
+
+ snd.send(m_a)
+ snd.send(m_b)
+ snd.close()
+
+ rcv = self.ssn.receiver("test-link-bindings-queue")
+ self.assertEmpty(rcv)
+
+ snd = self.ssn.sender("""
+amq.topic; {
+ link: {
+ x-bindings: [{queue: test-link-bindings-queue, key: a}]
+ }
+}
+""")
+
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a])
+ rcv.close()
+
+ rcv = self.ssn.receiver("""
+test-link-bindings-queue; {
+ link: {
+ x-bindings: [{exchange: "amq.topic", key: b}]
+ }
+}
+""")
+
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a, m_b])
+
+ def testSubjectOverride(self):
+ snd = self.ssn.sender("amq.topic/a")
+ rcv_a = self.ssn.receiver("amq.topic/a")
+ rcv_b = self.ssn.receiver("amq.topic/b")
+ m1 = self.content("testSubjectOverride", 1)
+ m2 = self.content("testSubjectOverride", 2)
+ snd.send(m1)
+ snd.send(Message(subject="b", content=m2))
+ self.drain(rcv_a, expected=[m1])
+ self.drain(rcv_b, expected=[m2])
+
+ def testSubjectDefault(self):
+ m1 = self.content("testSubjectDefault", 1)
+ m2 = self.content("testSubjectDefault", 2)
+ snd = self.ssn.sender("amq.topic/a")
+ rcv = self.ssn.receiver("amq.topic")
+ snd.send(m1)
+ snd.send(Message(subject="b", content=m2))
+ e1 = rcv.fetch(timeout=0)
+ e2 = rcv.fetch(timeout=0)
+ assert e1.subject == "a", "subject: %s" % e1.subject
+ assert e2.subject == "b", "subject: %s" % e2.subject
+ self.assertEmpty(rcv)
+
+ def doReliabilityTest(self, reliability, messages, expected):
+ snd = self.ssn.sender("amq.topic")
+ rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
+ for m in messages:
+ snd.send(m)
+ self.conn.detach()
+ self.conn.attach()
+ self.drain(rcv, expected=expected)
+
+ def testReliabilityUnreliable(self):
+ msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
+ self.doReliabilityTest("unreliable", msgs, [])
+
+ def testReliabilityAtLeastOnce(self):
+ msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
+ self.doReliabilityTest("at-least-once", msgs, msgs)
+
+ def testLinkName(self):
+ msgs = [self.message("testLinkName", i) for i in range(3)]
+ snd = self.ssn.sender("amq.topic")
+ trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
+ qrcv = self.ssn.receiver("test-link-name")
+ for m in msgs:
+ snd.send(m)
+ self.drain(qrcv, expected=msgs)
+
+ def testAssert1(self):
+ try:
+ snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
+ assert 0, "assertion failed to trigger"
+ except AssertionFailed, e:
+ pass
+
+ def testAssert2(self):
+ snd = self.ssn.sender("amq.topic; {assert: always}")
+
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "name/subject; {bad options"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def senderErrorTest(self, addr, exc, check=lambda e: True):
+ try:
+ self.ssn.sender(addr, durable=self.durable())
+ assert False, "sender creation succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % compat.format_exc(e)
+
+ def receiverErrorTest(self, addr, exc, check=lambda e: True):
+ try:
+ self.ssn.receiver(addr)
+ assert False, "receiver creation succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % compat.format_exc(e)
+
+ def testNoneTarget(self):
+ self.senderErrorTest(None, MalformedAddress)
+
+ def testNoneSource(self):
+ self.receiverErrorTest(None, MalformedAddress)
+
+ def testNoTarget(self):
+ self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
+
+ def testNoSource(self):
+ self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
+
+ def testUnparseableTarget(self):
+ self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
+ lambda e: "expecting COLON" in str(e))
+
+ def testUnparseableSource(self):
+ self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
+ lambda e: "expecting COLON" in str(e))
+
+ def testUnlexableTarget(self):
+ self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress,
+ lambda e: "unrecognized characters" in str(e))
+
+ def testUnlexableSource(self):
+ self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress,
+ lambda e: "unrecognized characters" in str(e))
+
+ def testInvalidMode(self):
+ self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
+ InvalidOption,
+ lambda e: "not in ('browse', 'consume')" in str(e))
+
+SENDER_Q = 'test-sender-q; {create: always, delete: always}'
+
+class SenderTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(SENDER_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(SENDER_Q)
+
+ def checkContent(self, content):
+ self.snd.send(content)
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+
+ out = Message(content)
+ self.snd.send(out)
+ echo = self.rcv.fetch(0)
+ assert out.content == echo.content
+ assert echo.content == msg.content
+ self.ssn.acknowledge()
+
+ def testSendString(self):
+ self.checkContent(self.content("testSendString"))
+
+ def testSendList(self):
+ self.checkContent(["testSendList", 1, 3.14, self.test_id])
+
+ def testSendMap(self):
+ self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
+
+ def asyncTest(self, capacity):
+ self.snd.capacity = capacity
+ msgs = [self.content("asyncTest", i) for i in range(15)]
+ for m in msgs:
+ self.snd.send(m, sync=False)
+ self.drain(self.rcv, timeout=self.delay(), expected=msgs)
+ self.ssn.acknowledge()
+
+ def testSendAsyncCapacity0(self):
+ try:
+ self.asyncTest(0)
+ assert False, "send shouldn't succeed with zero capacity"
+ except InsufficientCapacity:
+ # this is expected
+ pass
+
+ def testSendAsyncCapacity1(self):
+ self.asyncTest(1)
+
+ def testSendAsyncCapacity5(self):
+ self.asyncTest(5)
+
+ def testSendAsyncCapacityUNLIMITED(self):
+ self.asyncTest(UNLIMITED)
+
+ def testCapacityTimeout(self):
+ self.snd.capacity = 1
+ msgs = []
+ caught = False
+ while len(msgs) < 100:
+ m = self.content("testCapacity", len(msgs))
+ try:
+ self.snd.send(m, sync=False, timeout=0)
+ msgs.append(m)
+ except InsufficientCapacity:
+ caught = True
+ break
+ self.snd.sync()
+ self.drain(self.rcv, expected=msgs)
+ self.ssn.acknowledge()
+ assert caught, "did not exceed capacity"