# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys, re, traceback, socket from getopt import getopt, GetoptError from qpid.connection import Connection from qpid.util import connect from qpid.datatypes import Message, RangedSet from qpid.queue import Empty from qpid.session import SessionException from qpid.testlib import TestBase010 from time import sleep class PersistenceTest(TestBase010): XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 XA_OK = 0 def createMessage(self, **kwargs): session = self.session dp = {} dp['delivery_mode'] = 2 mp = {} for k, v in kwargs.iteritems(): if k in ['routing_key', 'delivery_mode']: dp[k] = v if k in ['message_id', 'correlation_id', 'application_headers']: mp[k] = v args = [] args.append(session.delivery_properties(**dp)) if len(mp): args.append(session.message_properties(**mp)) if kwargs.has_key('body'): args.append(kwargs['body']) return Message(*args) def phase1(self): session = self.session session.queue_declare(queue="queue-a", durable=True) session.queue_declare(queue="queue-b", durable=True) session.exchange_bind(queue="queue-a", exchange="amq.direct", binding_key="a") session.exchange_bind(queue="queue-b", exchange="amq.direct", binding_key="b") session.message_transfer(destination="amq.direct", message=self.createMessage(routing_key="a", correlation_id="Msg0001", body="A_Message1")) session.message_transfer(destination="amq.direct", message=self.createMessage(routing_key="b", correlation_id="Msg0002", body="B_Message1")) # session.queue_declare(queue="lvq-test", durable=True, arguments={"qpid.last_value_queue":True}) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B1")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A1")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A2")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B2")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B3")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C1")) def phase2(self): session = self.session #check queues exists session.queue_declare(queue="queue-a", durable=True, passive=True) session.queue_declare(queue="queue-b", durable=True, passive=True) #check they are still bound to amq.direct correctly responses = [] responses.append(session.exchange_bound(queue="queue-a", exchange="amq.direct", binding_key="a")) responses.append(session.exchange_bound(queue="queue-b", exchange="amq.direct", binding_key="b")) for r in responses: self.assert_(not r.exchange_not_found) self.assert_(not r.queue_not_found) self.assert_(not r.key_not_matched) #check expected messages are there self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1") self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1") self.assertEmptyQueue("queue-a") self.assertEmptyQueue("queue-b") session.queue_declare(queue="queue-c", durable=True) #send a message to a topic such that it reaches all queues session.exchange_bind(queue="queue-a", exchange="amq.topic", binding_key="abc") session.exchange_bind(queue="queue-b", exchange="amq.topic", binding_key="abc") session.exchange_bind(queue="queue-c", exchange="amq.topic", binding_key="abc") session.message_transfer(destination="amq.topic", message=self.createMessage(routing_key="abc", correlation_id="Msg0003", body="AB_Message2")) # #check LVQ exists and has exepected messages: # session.queue_declare(queue="lvq-test", durable=True, passive=True) # session.message_subscribe(destination="lvq", queue="lvq-test") # lvq = session.incoming("lvq") # lvq.start() # accepted = RangedSet() # for m in ["A2", "B3", "C1"]: # msg = lvq.get(timeout=1) # self.assertEquals(m, msg.body) # accepted.add(msg.id) # try: # extra = lvq.get(timeout=1) # self.fail("lvq-test not empty, contains: " + extra.body) # except Empty: None # #publish some more messages while subscriber is active (no replacement): # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C2")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C3")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A3")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A4")) # session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C4")) # #check that accepting replaced messages is safe # session.message_accept(accepted) def phase3(self): session = self.session # #lvq recovery validation # session.queue_declare(queue="lvq-test", durable=True, passive=True) # session.message_subscribe(destination="lvq", queue="lvq-test") # lvq = session.incoming("lvq") # lvq.start() # accepted = RangedSet() # lvq.start() # for m in ["C4", "A4"]: # msg = lvq.get(timeout=1) # self.assertEquals(m, msg.body) # accepted.add(msg.id) # session.message_accept(accepted) # try: # extra = lvq.get(timeout=1) # self.fail("lvq-test not empty, contains: " + extra.body) # except Empty: None # session.message_cancel(destination="lvq") # session.queue_delete(queue="lvq-test") #check queues exists session.queue_declare(queue="queue-a", durable=True, passive=True) session.queue_declare(queue="queue-b", durable=True, passive=True) session.queue_declare(queue="queue-c", durable=True, passive=True) session.tx_select() #check expected messages are there self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2") self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2") self.assertMessageOnQueue("queue-c", "Msg0003", "AB_Message2") self.assertEmptyQueue("queue-a") self.assertEmptyQueue("queue-b") self.assertEmptyQueue("queue-c") #note: default bindings must be restored for this to work session.message_transfer(message=self.createMessage( routing_key="queue-a", correlation_id="Msg0004", body="A_Message3")) session.message_transfer(message=self.createMessage( routing_key="queue-a", correlation_id="Msg0005", body="A_Message4")) session.message_transfer(message=self.createMessage( routing_key="queue-a", correlation_id="Msg0006", body="A_Message5")) session.tx_commit() #delete a queue session.queue_delete(queue="queue-c") session.message_subscribe(destination="ctag", queue="queue-a", accept_mode=0) session.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF) session.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF) included = session.incoming("ctag") msg1 = included.get(timeout=1) self.assertExpectedContent(msg1, "Msg0004", "A_Message3") msg2 = included.get(timeout=1) self.assertExpectedContent(msg2, "Msg0005", "A_Message4") msg3 = included.get(timeout=1) self.assertExpectedContent(msg3, "Msg0006", "A_Message5") self.ack(msg1, msg2, msg3) session.message_transfer(destination="amq.direct", message=self.createMessage( routing_key="queue-b", correlation_id="Msg0007", body="B_Message3")) session.tx_rollback() def phase4(self): session = self.session #check queues exists session.queue_declare(queue="queue-a", durable=True, passive=True) session.queue_declare(queue="queue-b", durable=True, passive=True) self.assertMessageOnQueue("queue-a", "Msg0004", "A_Message3") self.assertMessageOnQueue("queue-a", "Msg0005", "A_Message4") self.assertMessageOnQueue("queue-a", "Msg0006", "A_Message5") self.assertEmptyQueue("queue-a") self.assertEmptyQueue("queue-b") #check this queue doesn't exist try: session.queue_declare(queue="queue-c", durable=True, passive=True) raise Exception("Expected queue-c to have been deleted") except SessionException, e: self.assertEquals(404, e.args[0].error_code) def phase5(self): session = self.session queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"] for q in queues: session.queue_declare(queue=q, durable=True) session.queue_purge(queue=q) session.message_transfer(message=self.createMessage( routing_key="queue-a1", correlation_id="MsgA", body="MessageA")) session.message_transfer(message=self.createMessage( routing_key="queue-b1", correlation_id="MsgB", body="MessageB")) session.message_transfer(message=self.createMessage( routing_key="queue-c1", correlation_id="MsgC", body="MessageC")) session.message_transfer(message=self.createMessage( routing_key="queue-d1", correlation_id="MsgD", body="MessageD")) session.dtx_select() txa = self.xid('a') txb = self.xid('b') txc = self.xid('c') txd = self.xid('d') self.txswap("queue-a1", "queue-a2", txa) self.txswap("queue-b1", "queue-b2", txb) self.txswap("queue-c1", "queue-c2", txc) self.txswap("queue-d1", "queue-d2", txd) #no queue should have any messages accessible for q in queues: self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) self.assertEqual(self.XA_OK, session.dtx_commit(xid=txa, one_phase=True).status) self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txb).status) self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txc).status) self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txd).status) #further checks not_empty = ["queue-a2", "queue-b1"] for q in queues: if q in not_empty: self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) else: self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) def phase6(self): session = self.session #check prepared transaction are reported correctly by recover txc = self.xid('c') txd = self.xid('d') xids = session.dtx_recover().in_doubt ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these if txc.global_id not in ids: self.fail("Recovered xids not as expected. missing: %s" % (txc)) if txd.global_id not in ids: self.fail("Recovered xids not as expected. missing: %s" % (txd)) self.assertEqual(2, len(xids)) queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"] not_empty = ["queue-a2", "queue-b1"] #re-check not_empty = ["queue-a2", "queue-b1"] for q in queues: if q in not_empty: self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) else: self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q)) #complete the prepared transactions self.assertEqual(self.XA_OK, session.dtx_commit(xid=txc).status) self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txd).status) not_empty.append("queue-c2") not_empty.append("queue-d1") for q in queues: if q in not_empty: self.assertEqual(1, session.queue_query(queue=q).message_count) else: self.assertEqual(0, session.queue_query(queue=q).message_count) def phase7(self): session = self.session session.synchronous = False # check xids from phase 6 are gone txc = self.xid('c') txd = self.xid('d') xids = session.dtx_recover().in_doubt ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these if txc.global_id in ids: self.fail("Xid still present : %s" % (txc)) if txd.global_id in ids: self.fail("Xid still present : %s" % (txc)) self.assertEqual(0, len(xids)) #test deletion of queue after publish #create queue session.queue_declare(queue = "q", auto_delete=True, durable=True) #send message for i in range(1, 10): session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message")) session.synchronous = True #explicitly delete queue session.queue_delete(queue = "q") #test acking of message from auto-deleted queue #create queue session.queue_declare(queue = "q", auto_delete=True, durable=True) #send message session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message")) #create consumer session.message_subscribe(queue = "q", destination = "a", accept_mode=0, acquire_mode=0) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") session.message_flow(unit = 0, value = 10, destination = "a") queue = session.incoming("a") #consume the message, cancel subscription (triggering auto-delete), then ack it msg = queue.get(timeout = 5) session.message_cancel(destination = "a") self.ack(msg) #test implicit deletion of bindings when queue is deleted session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz") session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message")) session.queue_delete(queue = "durable-subscriber-queue") #test unbind: #create a series of bindings to a queue session.queue_declare(queue = "binding-test-queue", durable=True) session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="abc") session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr") session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="xyz") session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="a", arguments={"x-match":"all", "p":"a"}) session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="b", arguments={"x-match":"all", "p":"b"}) session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="c", arguments={"x-match":"all", "p":"c"}) #then restart broker... def phase8(self): session = self.session #continue testing unbind: #send messages to the queue via each of the bindings for k in ["abc", "pqr", "xyz"]: data = "first %s" % (k) session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data)) for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]: data = "first %s" % (a["p"]) session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data)) #unbind some bindings (using final 0-10 semantics) session.exchange_unbind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr") session.exchange_unbind(exchange="amq.match", queue="binding-test-queue", binding_key="b") #send messages again for k in ["abc", "pqr", "xyz"]: data = "second %s" % (k) session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data)) for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]: data = "second %s" % (a["p"]) session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data)) #check that only the correct messages are received expected = [] for k in ["abc", "pqr", "xyz"]: expected.append("first %s" % (k)) for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]: expected.append("first %s" % (a["p"])) for k in ["abc", "xyz"]: expected.append("second %s" % (k)) for a in [{"p":"a"}, {"p":"c"}]: expected.append("second %s" % (a["p"])) session.message_subscribe(queue = "binding-test-queue", destination = "binding-test") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "binding-test") session.message_flow(unit = 0, value = 10, destination = "binding-test") queue = session.incoming("binding-test") while len(expected): msg = queue.get(timeout=1) if msg.body not in expected: self.fail("Missing message: %s" % msg.body) expected.remove(msg.body) try: msg = queue.get(timeout=1) self.fail("Got extra message: %s" % msg.body) except Empty: pass session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz") session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message")) session.queue_delete(queue = "durable-subscriber-queue") def xid(self, txid, branchqual = ''): return self.session.xid(format=0, global_id=txid, branch_id=branchqual) def txswap(self, src, dest, tx): self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) self.session.message_subscribe(destination="temp-swap", queue=src, accept_mode=0) self.session.message_flow(destination="temp-swap", unit=0, value=1) self.session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) msg = self.session.incoming("temp-swap").get(timeout=1) self.session.message_cancel(destination="temp-swap") self.session.message_transfer(message=self.createMessage(routing_key=dest, correlation_id=self.getProperty(msg, 'correlation_id'), body=msg.body)) self.ack(msg) self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status) def assertEmptyQueue(self, name): self.assertEqual(0, self.session.queue_query(queue=name).message_count) def assertConnectionException(self, expectedCode, message): self.assertEqual("connection", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) def assertExpectedMethod(self, reply, klass, method): self.assertEqual(klass, reply.method.klass.name) self.assertEqual(method, reply.method.name) def assertExpectedContent(self, msg, id, body): self.assertEqual(id, self.getProperty(msg, 'correlation_id')) self.assertEqual(body, msg.body) return msg def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) return None def ack(self, *msgs): session = self.session set = RangedSet() for m in msgs: set.add(m.id) #TODO: tidy up completion session.receiver._completed.add(m.id) session.message_accept(set) session.channel.session_completed(session.receiver._completed) def assertExpectedGetResult(self, id, body): return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body) def assertEqual(self, expected, actual, msg=''): if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual)) def assertMessageOnQueue(self, queue, id, body): self.session.message_subscribe(destination="incoming-gets", queue=queue, accept_mode=0) self.session.message_flow(destination="incoming-gets", unit=0, value=1) self.session.message_flow(destination="incoming-gets", unit=1, value=0xFFFFFFFF) msg = self.session.incoming("incoming-gets").get(timeout=1) self.assertExpectedContent(msg, id, body) self.ack(msg) self.session.message_cancel(destination="incoming-gets") def __init__(self): TestBase010.__init__(self, "run") self.setBroker("localhost") self.errata = [] def connect(self): """ Connects to the broker """ self.conn = Connection(connect(self.host, self.port)) self.conn.start(timeout=10) self.session = self.conn.session("test-session", timeout=10) def run(self, args=sys.argv[1:]): try: opts, extra = getopt(args, "r:s:e:b:p:h", ["retry=", "spec=", "errata=", "broker=", "phase=", "help"]) except GetoptError, e: self._die(str(e)) phase = 0 retry = 0; for opt, value in opts: if opt in ("-h", "--help"): self._die() if opt in ("-s", "--spec"): self.spec = value if opt in ("-e", "--errata"): self.errata.append(value) if opt in ("-b", "--broker"): self.setBroker(value) if opt in ("-p", "--phase"): phase = int(value) if opt in ("-r", "--retry"): retry = int(value) if not phase: self._die("please specify the phase to run") phase = "phase%d" % phase self.connect() try: getattr(self, phase)() print phase, "succeeded" res = True; except Exception, e: print phase, "failed: ", e traceback.print_exc() res = False if not self.session.error(): self.session.close(timeout=10) self.conn.close(timeout=10) # Crude fix to wait for thread in client to exit after return from session_close() # Reduces occurrences of "Unhandled exception in thread" messages after each test import time time.sleep(1) return res def setBroker(self, broker): rex = re.compile(r""" # [ [ / ] @] [ : ] ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) match = rex.match(broker) if not match: self._die("'%s' is not a valid broker" % (broker)) self.user, self.password, self.host, self.port = match.groups() self.port = int(default(self.port, 5672)) self.user = default(self.user, "guest") self.password = default(self.password, "guest") def _die(self, message = None): if message: print message print """ Options: -h/--help : this message -s/--spec : file containing amqp XML spec -p/--phase : test phase to run -b/--broker [[/]@][:] : broker to connect to """ sys.exit(1) def default(value, default): if (value == None): return default else: return value if __name__ == "__main__": test = PersistenceTest() if not test.run(): sys.exit(1)