# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # setup, usage, teardown, errors(sync), errors(async), stress, soak, # boundary-conditions, config import time from qpid import compat from qpid.tests import Test from qpid.harness import Skipped from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): def setup_connection(self): return None def setup_session(self): return None def setup_sender(self): return None def setup_receiver(self): return None def setup(self): self.test_id = uuid4() self.broker = self.config.broker try: self.conn = self.setup_connection() except ConnectError, e: raise Skipped(e) self.ssn = self.setup_session() self.snd = self.setup_sender() if self.snd is not None: self.snd.durable = self.durable() self.rcv = self.setup_receiver() def teardown(self): if self.conn is not None and self.conn.connected(): self.conn.close() def content(self, base, count = None): if count is None: return "%s[%s]" % (base, self.test_id) else: return "%s[%s, %s]" % (base, count, self.test_id) def ping(self, ssn): PING_Q = 'ping-queue {create: always}' # send a message sender = ssn.sender(PING_Q, durable=self.durable()) content = self.content("ping") sender.send(content) receiver = ssn.receiver(PING_Q) msg = receiver.fetch(0) ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) def drain(self, rcv, limit=None, timeout=0, expected=None): contents = [] try: while limit is None or len(contents) < limit: contents.append(rcv.fetch(timeout=timeout).content) except Empty: pass if expected is not None: assert expected == contents, "expected %s, got %s" % (expected, contents) return contents def assertEmpty(self, rcv): contents = self.drain(rcv) assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) def assertPending(self, rcv, expected): p = rcv.pending() assert p == expected, "expected %s, got %s" % (expected, p) def sleep(self): time.sleep(self.delay()) def delay(self): return float(self.config.defines.get("delay", "2")) def get_bool(self, name): return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") def durable(self): return self.get_bool("durable") def reconnect(self): return self.get_bool("reconnect") class SetupTests(Base): def testOpen(self): # XXX: need to flesh out URL support/syntax self.conn = Connection.open(self.broker.host, self.broker.port, reconnect=self.reconnect()) self.ping(self.conn.session()) def testConnect(self): # XXX: need to flesh out URL support/syntax self.conn = Connection(self.broker.host, self.broker.port, reconnect=self.reconnect()) self.conn.connect() self.ping(self.conn.session()) def testConnectError(self): try: self.conn = Connection.open("localhost", 0) assert False, "connect succeeded" except ConnectError, e: # XXX: should verify that e includes appropriate diagnostic info pass class ConnectionTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, reconnect=self.reconnect()) def testSessionAnon(self): ssn1 = self.conn.session() ssn2 = self.conn.session() self.ping(ssn1) self.ping(ssn2) assert ssn1 is not ssn2 def testSessionNamed(self): ssn1 = self.conn.session("one") ssn2 = self.conn.session("two") self.ping(ssn1) self.ping(ssn2) assert ssn1 is not ssn2 assert ssn1 is self.conn.session("one") assert ssn2 is self.conn.session("two") def testDisconnect(self): ssn = self.conn.session() self.ping(ssn) self.conn.disconnect() try: self.ping(ssn) assert False, "ping succeeded" except Disconnected: # this is the expected failure when pinging on a disconnected # connection pass self.conn.connect() self.ping(ssn) def testStart(self): ssn = self.conn.session() assert not ssn.started self.conn.start() assert ssn.started ssn2 = self.conn.session() assert ssn2.started def testStop(self): self.conn.start() ssn = self.conn.session() assert ssn.started self.conn.stop() assert not ssn.started ssn2 = self.conn.session() assert not ssn2.started def testClose(self): self.conn.close() assert not self.conn.connected() ACK_Q = 'test-ack-queue {create: always}' class SessionTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def testSender(self): snd = self.ssn.sender('test-snd-queue {create: always}', durable=self.durable()) snd2 = self.ssn.sender(snd.target, durable=self.durable()) assert snd is not snd2 snd2.close() content = self.content("testSender") snd.send(content) rcv = self.ssn.receiver(snd.target) msg = rcv.fetch(0) assert msg.content == content self.ssn.acknowledge(msg) def testReceiver(self): rcv = self.ssn.receiver('test-rcv-queue {create: always}') rcv2 = self.ssn.receiver(rcv.source) assert rcv is not rcv2 rcv2.close() content = self.content("testReceiver") snd = self.ssn.sender(rcv.source, durable=self.durable()) snd.send(content) msg = rcv.fetch(0) assert msg.content == content self.ssn.acknowledge(msg) def testNextReceiver(self): ADDR = 'test-next-rcv-queue {create: always}' rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) # XXX: this won't work if it is before the receiver creation self.ssn.start() snd = self.ssn.sender(ADDR) msgs = [] for i in range(10): content = self.content("testNextReceiver", i) snd.send(content) msgs.append(content) fetched = [] try: while True: rcv = self.ssn.next_receiver(timeout=self.delay()) assert rcv in (rcv1, rcv2, rcv3) assert rcv.pending() > 0 fetched.append(rcv.fetch().content) except Empty: pass assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) self.ssn.acknowledge() def testStart(self): START_Q = 'test-start-queue {create: always}' rcv = self.ssn.receiver(START_Q) assert not rcv.started self.ssn.start() assert rcv.started rcv = self.ssn.receiver(START_Q) assert rcv.started def testStop(self): STOP_Q = 'test-stop-queue {create: always}' self.ssn.start() rcv = self.ssn.receiver(STOP_Q) assert rcv.started self.ssn.stop() assert not rcv.started rcv = self.ssn.receiver(STOP_Q) assert not rcv.started # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown def ackTest(self, acker, ack_capacity=None): # send a bunch of messages snd = self.ssn.sender(ACK_Q, durable=self.durable()) contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking rcv = self.ssn.receiver(ACK_Q) self.drain(rcv, expected=contents) self.ssn.close() # drain the queue again, verify that they are all the messages # were requeued, and ack this time before closing self.ssn = self.conn.session() if ack_capacity is not None: self.ssn.ack_capacity = ack_capacity rcv = self.ssn.receiver(ACK_Q) self.drain(rcv, expected=contents) acker(self.ssn) self.ssn.close() # drain the queue a final time and verify that the messages were # dequeued self.ssn = self.conn.session() rcv = self.ssn.receiver(ACK_Q) self.assertEmpty(rcv) def testAcknowledge(self): self.ackTest(lambda ssn: ssn.acknowledge()) def testAcknowledgeAsync(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) def testAcknowledgeAsyncAckCap0(self): try: try: self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) assert False, "acknowledge shouldn't succeed with ack_capacity of zero" except InsufficientCapacity: pass finally: self.ssn.ack_capacity = UNLIMITED self.drain(self.ssn.receiver(ACK_Q)) self.ssn.acknowledge() def testAcknowledgeAsyncAckCap1(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) def testAcknowledgeAsyncAckCap5(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) def testAcknowledgeAsyncAckCapUNLIMITED(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) def send(self, ssn, queue, base, count=1): snd = ssn.sender(queue, durable=self.durable()) contents = [] for i in range(count): c = self.content(base, i) snd.send(c) contents.append(c) snd.close() return contents def txTest(self, commit): TX_Q = 'test-tx-queue {create: always}' TX_Q_COPY = 'test-tx-queue-copy {create: always}' txssn = self.conn.session(transactional=True) contents = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) rcv = self.ssn.receiver(txrcv.source) copy_rcv = self.ssn.receiver(txsnd.target) self.assertEmpty(copy_rcv) for i in range(3): m = txrcv.fetch(0) txsnd.send(m) self.assertEmpty(copy_rcv) txssn.acknowledge() if commit: txssn.commit() self.assertEmpty(rcv) assert contents == self.drain(copy_rcv) else: txssn.rollback() assert contents == self.drain(rcv) self.assertEmpty(copy_rcv) self.ssn.acknowledge() def testCommit(self): self.txTest(True) def testRollback(self): self.txTest(False) def txTestSend(self, commit): TX_SEND_Q = 'test-tx-send-queue {create: always}' txssn = self.conn.session(transactional=True) contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: txssn.commit() assert contents == self.drain(rcv) self.ssn.acknowledge() else: txssn.rollback() self.assertEmpty(rcv) txssn.commit() self.assertEmpty(rcv) def testCommitSend(self): self.txTestSend(True) def testRollbackSend(self): self.txTestSend(False) def txTestAck(self, commit): TX_ACK_Q = 'test-tx-ack-queue {create: always}' txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_Q) self.assertEmpty(txrcv) contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3) assert contents == self.drain(txrcv) if commit: txssn.acknowledge() else: txssn.rollback() drained = self.drain(txrcv) assert contents == drained, "expected %s, got %s" % (contents, drained) txssn.acknowledge() txssn.rollback() assert contents == self.drain(txrcv) txssn.commit() # commit without ack self.assertEmpty(txrcv) txssn.close() txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_Q) assert contents == self.drain(txrcv) txssn.acknowledge() txssn.commit() rcv = self.ssn.receiver(TX_ACK_Q) self.assertEmpty(rcv) txssn.close() self.assertEmpty(rcv) def testCommitAck(self): self.txTestAck(True) def testRollbackAck(self): self.txTestAck(False) def testClose(self): self.ssn.close() try: self.ping(self.ssn) assert False, "ping succeeded" except Disconnected: pass RECEIVER_Q = 'test-receiver-queue {create: always}' class ReceiverTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def setup_sender(self): return self.ssn.sender(RECEIVER_Q) def setup_receiver(self): return self.ssn.receiver(RECEIVER_Q) def send(self, base, count = None): content = self.content(base, count) self.snd.send(content) return content def testFetch(self): try: msg = self.rcv.fetch(0) assert False, "unexpected message: %s" % msg except Empty: pass try: start = time.time() msg = self.rcv.fetch(self.delay()) assert False, "unexpected message: %s" % msg except Empty: elapsed = time.time() - start assert elapsed >= self.delay() one = self.send("testFetch", 1) two = self.send("testFetch", 2) three = self.send("testFetch", 3) msg = self.rcv.fetch(0) assert msg.content == one msg = self.rcv.fetch(self.delay()) assert msg.content == two msg = self.rcv.fetch() assert msg.content == three self.ssn.acknowledge() def testStart(self): content = self.send("testStart") self.sleep() assert self.rcv.pending() == 0 self.rcv.start() self.sleep() assert self.rcv.pending() == 1 msg = self.rcv.fetch(0) assert msg.content == content assert self.rcv.pending() == 0 self.ssn.acknowledge() def testStop(self): self.rcv.start() one = self.send("testStop", 1) self.sleep() assert self.rcv.pending() == 1 msg = self.rcv.fetch(0) assert msg.content == one self.rcv.stop() two = self.send("testStop", 2) self.sleep() assert self.rcv.pending() == 0 msg = self.rcv.fetch(0) assert msg.content == two self.ssn.acknowledge() def testPending(self): self.rcv.start() assert self.rcv.pending() == 0 for i in range(3): self.send("testPending", i) self.sleep() assert self.rcv.pending() == 3 for i in range(3, 10): self.send("testPending", i) self.sleep() assert self.rcv.pending() == 10 self.drain(self.rcv, limit=3) assert self.rcv.pending() == 7 self.drain(self.rcv) assert self.rcv.pending() == 0 self.ssn.acknowledge() def testCapacity(self): self.rcv.capacity = 5 self.rcv.start() self.assertPending(self.rcv, 0) for i in range(15): self.send("testCapacity", i) self.sleep() self.assertPending(self.rcv, 5) self.drain(self.rcv, limit = 5) self.sleep() self.assertPending(self.rcv, 5) drained = self.drain(self.rcv) assert len(drained) == 10, "%s, %s" % (len(drained), drained) self.assertPending(self.rcv, 0) self.ssn.acknowledge() def testCapacityUNLIMITED(self): self.rcv.capacity = UNLIMITED self.rcv.start() self.assertPending(self.rcv, 0) for i in range(10): self.send("testCapacityUNLIMITED", i) self.sleep() self.assertPending(self.rcv, 10) self.drain(self.rcv) self.assertPending(self.rcv, 0) self.ssn.acknowledge() # XXX: need testClose NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "{bad address}" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" class AddressErrorTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def sendErrorTest(self, addr, exc, check=lambda e: True): snd = self.ssn.sender(addr, durable=self.durable()) try: snd.send("hello") assert False, "send succeeded" except exc, e: assert check(e), "unexpected error: %s" % compat.format_exc(e) snd.close() def fetchErrorTest(self, addr, exc, check=lambda e: True): rcv = self.ssn.receiver(addr) try: rcv.fetch(timeout=0) assert False, "fetch succeeded" except exc, e: assert check(e), "unexpected error: %s" % compat.format_exc(e) rcv.close() def testNoneTarget(self): # XXX: should have specific exception for this self.sendErrorTest(None, SendError) def testNoneSource(self): # XXX: should have specific exception for this self.fetchErrorTest(None, ReceiveError) def testNoTarget(self): # XXX: should have specific exception for this self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) def testNoSource(self): # XXX: should have specific exception for this self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) def testUnparseableTarget(self): # XXX: should have specific exception for this self.sendErrorTest(UNPARSEABLE_ADDR, SendError, lambda e: "expecting ID" in str(e)) def testUnparseableSource(self): # XXX: should have specific exception for this self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError, lambda e: "expecting ID" in str(e)) def testUnlexableTarget(self): # XXX: should have specific exception for this self.sendErrorTest(UNLEXABLE_ADDR, SendError, lambda e: "unrecognized character" in str(e)) def testUnlexableSource(self): # XXX: should have specific exception for this self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError, lambda e: "unrecognized character" in str(e)) SENDER_Q = 'test-sender-q {create: always}' class SenderTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def setup_sender(self): return self.ssn.sender(SENDER_Q) def setup_receiver(self): return self.ssn.receiver(SENDER_Q) def checkContent(self, content): self.snd.send(content) msg = self.rcv.fetch(0) assert msg.content == content out = Message(content) self.snd.send(out) echo = self.rcv.fetch(0) assert out.content == echo.content assert echo.content == msg.content self.ssn.acknowledge() def testSendString(self): self.checkContent(self.content("testSendString")) def testSendList(self): self.checkContent(["testSendList", 1, 3.14, self.test_id]) def testSendMap(self): self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) def asyncTest(self, capacity): self.snd.capacity = capacity msgs = [self.content("asyncTest", i) for i in range(15)] for m in msgs: self.snd.send(m, sync=False) drained = self.drain(self.rcv, timeout=self.delay()) assert msgs == drained, "expected %s, got %s" % (msgs, drained) self.ssn.acknowledge() def testSendAsyncCapacity0(self): try: self.asyncTest(0) assert False, "send shouldn't succeed with zero capacity" except InsufficientCapacity: # this is expected pass def testSendAsyncCapacity1(self): self.asyncTest(1) def testSendAsyncCapacity5(self): self.asyncTest(5) def testSendAsyncCapacityUNLIMITED(self): self.asyncTest(UNLIMITED) def testCapacityTimeout(self): self.snd.capacity = 1 msgs = [] caught = False while len(msgs) < 100: m = self.content("testCapacity", len(msgs)) try: self.snd.send(m, sync=False, timeout=0) msgs.append(m) except InsufficientCapacity: caught = True break self.snd.sync() self.drain(self.rcv, expected=msgs) self.ssn.acknowledge() assert caught, "did not exceed capacity" class MessageTests(Base): def testCreateString(self): m = Message("string") assert m.content == "string" assert m.content_type is None def testCreateUnicode(self): m = Message(u"unicode") assert m.content == u"unicode" assert m.content_type == "text/plain; charset=utf8" def testCreateMap(self): m = Message({}) assert m.content == {} assert m.content_type == "amqp/map" def testCreateList(self): m = Message([]) assert m.content == [] assert m.content_type == "amqp/list" def testContentTypeOverride(self): m = Message() m.content_type = "text/html; charset=utf8" m.content = u"" assert m.content_type == "text/html; charset=utf8" ECHO_Q = 'test-message-echo-queue {create: always}' class MessageEchoTests(Base): def setup_connection(self): return Connection.open(self.broker.host, self.broker.port, reconnect=self.reconnect()) def setup_session(self): return self.conn.session() def setup_sender(self): return self.ssn.sender(ECHO_Q) def setup_receiver(self): return self.ssn.receiver(ECHO_Q) def check(self, msg): self.snd.send(msg) echo = self.rcv.fetch(0) assert msg.id == echo.id assert msg.subject == echo.subject assert msg.user_id == echo.user_id assert msg.to == echo.to assert msg.reply_to == echo.reply_to assert msg.correlation_id == echo.correlation_id assert msg.properties == echo.properties assert msg.content_type == echo.content_type assert msg.content == echo.content, "%s, %s" % (msg, echo) self.ssn.acknowledge(echo) def testStringContent(self): self.check(Message("string")) def testUnicodeContent(self): self.check(Message(u"unicode")) TEST_MAP = {"key1": "string", "key2": u"unicode", "key3": 3, "key4": -3, "key5": 3.14, "key6": -3.14, "key7": ["one", 2, 3.14], "key8": [], "key9": {"sub-key0": 3}} def testMapContent(self): self.check(Message(MessageEchoTests.TEST_MAP)) def testListContent(self): self.check(Message([])) self.check(Message([1, 2, 3])) self.check(Message(["one", 2, 3.14, {"four": 4}])) def testProperties(self): msg = Message() msg.to = "to-address" msg.subject = "subject" msg.correlation_id = str(self.test_id) msg.properties = MessageEchoTests.TEST_MAP msg.reply_to = "reply-address" self.check(msg) class TestTestsXXX(Test): def testFoo(self): print "this test has output" def testBar(self): print "this test "*8 print "has"*10 print "a"*75 print "lot of"*10 print "output"*10 def testQux(self): import sys sys.stdout.write("this test has output with no newline") def testQuxFail(self): import sys sys.stdout.write("this test has output with no newline") fdsa