diff options
Diffstat (limited to 'qpid/cpp/src/tests/legacystore/python_tests')
4 files changed, 850 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/__init__.py b/qpid/cpp/src/tests/legacystore/python_tests/__init__.py new file mode 100644 index 0000000000..ebb9da8670 --- /dev/null +++ b/qpid/cpp/src/tests/legacystore/python_tests/__init__.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Do not delete - marks this directory as a python package. + +from client_persistence import * +from resize import * + diff --git a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py new file mode 100644 index 0000000000..37c12601be --- /dev/null +++ b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py @@ -0,0 +1,239 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os + +from brokertest import EXPECT_EXIT_OK +from store_test import StoreTest, Qmf, store_args +from qpid.messaging import * + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client. + +class ExchangeQueueTests(StoreTest): + """ + Simple tests of the broker exchange and queue types + """ + + def test_direct_exchange(self): + """Test Direct exchange.""" + broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK) + msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001") + msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002") + broker.send_message("a", msg1) + broker.send_message("b", msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_direct_exchange") + self.check_message(broker, "a", msg1, True) + self.check_message(broker, "b", msg2, True) + + def test_topic_exchange(self): + """Test Topic exchange.""" + broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}") + snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}") + ssn.receiver("a; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}") + ssn.receiver("b; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}") + ssn.receiver("c; {create:always, link:{x-bindings:[{exchange:abc, key:key1}, " + "{exchange:abc, key: key2}]}, node:{durable:True}}") + ssn.receiver("d; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}") + ssn.receiver("e; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}") + msg1 = Message("Message1", durable=True, correlation_id="Msg0003") + snd1.send(msg1) + msg2 = Message("Message2", durable=True, correlation_id="Msg0004") + snd2.send(msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_topic_exchange") + self.check_message(broker, "a", msg1, True) + self.check_message(broker, "b", msg1, True) + self.check_messages(broker, "c", [msg1, msg2], True) + self.check_message(broker, "d", msg2, True) + self.check_message(broker, "e", msg2, True) + + + def test_legacy_lvq(self): + """Test legacy LVQ.""" + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) + ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"}) + ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"}) + mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"}) + mb2 = Message("B2", durable=True, correlation_id="Msg0008", properties={"qpid.LVQ_key":"B"}) + mb3 = Message("B3", durable=True, correlation_id="Msg0009", properties={"qpid.LVQ_key":"B"}) + mc1 = Message("C1", durable=True, correlation_id="Msg0010", properties={"qpid.LVQ_key":"C"}) + broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1], + xprops="arguments:{\"qpid.last_value_queue\":True}") + broker.terminate() + + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) + ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False) + # Add more messages while subscriber is active (no replacement): + ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"}) + ma4 = Message("A4", durable=True, correlation_id="Msg0012", properties={"qpid.LVQ_key":"A"}) + mc2 = Message("C2", durable=True, correlation_id="Msg0013", properties={"qpid.LVQ_key":"C"}) + mc3 = Message("C3", durable=True, correlation_id="Msg0014", properties={"qpid.LVQ_key":"C"}) + mc4 = Message("C4", durable=True, correlation_id="Msg0015", properties={"qpid.LVQ_key":"C"}) + broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn) + ssn.acknowledge() + broker.terminate() + + broker = self.broker(store_args(), name="test_lvq") + self.check_messages(broker, "lvq-test", [ma4, mc4], True) + + + def test_fanout_exchange(self): + """Test Fanout Exchange""" + broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}") + msg1 = Message("Msg1", durable=True, correlation_id="Msg0001") + snd.send(msg1) + msg2 = Message("Msg2", durable=True, correlation_id="Msg0002") + snd.send(msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_fanout_exchange") + self.check_messages(broker, "q1", [msg1, msg2], True) + self.check_messages(broker, "q2", [msg1, msg2], True) + self.check_messages(broker, "q3", [msg1, msg2], True) + + + def test_message_reject(self): + broker = self.broker(store_args(), name="test_message_reject", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd = ssn.sender("tmr; {create:always, node:{type:queue, durable:True}}") + rcv = ssn.receiver("tmr; {create:always, node:{type:queue, durable:True}}") + m1 = Message("test_message_reject", durable=True, correlation_id="Msg0001") + snd.send(m1) + m2 = rcv.fetch() + ssn.acknowledge(message=m2, disposition=Disposition(REJECTED)) + broker.terminate() + + broker = self.broker(store_args(), name="test_message_reject") + qmf = Qmf(broker) + assert qmf.queue_message_count("tmr") == 0 + + + def test_route(self): + """ Test the recovery of a route (link and bridge objects.""" + broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf_broker_obj = qmf.get_objects("broker")[0] + + # create a "link" + link_args = {"host":"a.fake.host.com", "port":9999, "durable":True, + "authMechanism":"PLAIN", "username":"guest", "password":"guest", + "transport":"tcp"} + result = qmf_broker_obj.create("link", "test-link", link_args, False) + self.assertEqual(result.status, 0, result) + link = qmf.get_objects("link")[0] + + # create bridge + bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key", "durable":True} + result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False); + self.assertEqual(result.status, 0, result) + bridge = qmf.get_objects("bridge")[0] + + broker.terminate() + + # recover the link and bridge + broker = self.broker(store_args(), name="test_route") + qmf = Qmf(broker) + qmf_broker_obj = qmf.get_objects("broker")[0] + self.assertEqual(len(qmf.get_objects("link")), 1) + self.assertEqual(len(qmf.get_objects("bridge")), 1) + + + +class AlternateExchangePropertyTests(StoreTest): + """ + Test the persistence of the Alternate Exchange property for exchanges and queues. + """ + + def test_exchange(self): + """Exchange alternate exchange property persistence test""" + broker = self.broker(store_args(), name="test_exchange", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance + qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch") + qmf.close() + broker.terminate() + + broker = self.broker(store_args(), name="test_exchange") + qmf = Qmf(broker) + try: + qmf.add_exchange("altExch", "direct", passive=True) + except Exception, error: + self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error) + try: + qmf.add_exchange("testExch", "direct", passive=True) + except Exception, error: + self.fail("Test exchange (\"testExch\") instance not recovered: %s" % error) + self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"), + "Alternate exchange property not found or is incorrect on exchange \"testExch\".") + qmf.close() + + def test_queue(self): + """Queue alternate exchange property persistexchangeNamece test""" + broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance + qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch") + qmf.close() + broker.terminate() + + broker = self.broker(store_args(), name="test_queue") + qmf = Qmf(broker) + try: + qmf.add_exchange("altExch", "direct", passive=True) + except Exception, error: + self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error) + try: + qmf.add_queue("testQueue", passive=True) + except Exception, error: + self.fail("Test queue (\"testQueue\") instance not recovered: %s" % error) + self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = "altExch"), + "Alternate exchange property not found or is incorrect on queue \"testQueue\".") + qmf.close() + + +class RedeliveredTests(StoreTest): + """ + Test the behavior of the redelivered flag in the context of persistence + """ + + def test_broker_recovery(self): + """Test that the redelivered flag is set on messages after recovery of broker""" + broker = self.broker(store_args(), name="test_broker_recovery", expect=EXPECT_EXIT_OK) + msg_content = "xyz"*100 + msg = Message(msg_content, durable=True) + broker.send_message("testQueue", msg) + broker.terminate() + + broker = self.broker(store_args(), name="test_broker_recovery") + rcv_msg = broker.get_message("testQueue") + self.assertEqual(msg_content, rcv_msg.content) + self.assertTrue(rcv_msg.redelivered) + diff --git a/qpid/cpp/src/tests/legacystore/python_tests/resize.py b/qpid/cpp/src/tests/legacystore/python_tests/resize.py new file mode 100644 index 0000000000..e719b755da --- /dev/null +++ b/qpid/cpp/src/tests/legacystore/python_tests/resize.py @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import glob +import os +import subprocess + +from brokertest import EXPECT_EXIT_OK +from qpid.datatypes import uuid4 +from store_test import StoreTest, store_args +from qpid.messaging import Message + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + +class ResizeTest(StoreTest): + + resize_tool = os.getenv("QPID_STORE_RESIZE_TOOL", "qpid-store-resize") + print resize_tool + def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail): + for f in glob.glob(os.path.join(store_dir, "*")): + final_store_dir = os.path.join(f, queue_name) + p = subprocess.Popen([self.resize_tool, final_store_dir, "--num-jfiles", str(resize_num_files), + "--jfile-size-pgs", str(resize_file_size), "--quiet"], stdout = subprocess.PIPE, + stderr = subprocess.STDOUT) + res = p.wait() + err_found = False + try: + for l in p.stdout: + if exp_fail: + err_found = True + print "[Expected error]:", + print l, + finally: + p.stdout.close() + return res + + def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8, + init_file_size = 24, exp_fail = False, wait_time = None): + # Using a sender will force the creation of an empty persistent queue which is needed for some tests + broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time) + ssn = broker.connect().session() + snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name) + + msgs = [] + for index in range(0, num_msgs): + msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index) + msgs.append(msg) + snd.send(msg) + broker.terminate() + + res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files, + resize_file_size, exp_fail) + if res != 0: + if exp_fail: + return + self.fail("ERROR: Resize operation failed with return code %d" % res) + elif exp_fail: + self.fail("ERROR: Resize operation succeeded, but a failure was expected") + + broker = self.broker(store_args(), name="broker") + self.check_messages(broker, queue_name, msgs, True) + + # TODO: Check the physical files to check number and size are as expected. + + +class SimpleTest(ResizeTest): + """ + Simple tests of the resize utility for resizing a journal to larger and smaller sizes. + """ + + def test_empty_store_same(self): + self._resize_test(queue_name = "empty_store_same", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 8, resize_file_size = 24) + + def test_empty_store_up(self): + self._resize_test(queue_name = "empty_store_up", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 16, resize_file_size = 48) + + def test_empty_store_down(self): + self._resize_test(queue_name = "empty_store_down", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 6, resize_file_size = 12) + +# TODO: Put into long tests, make sure there is > 128GB free disk space +# def test_empty_store_max(self): +# self._resize_test(queue_name = "empty_store_max", +# num_msgs = 0, msg_size = 0, +# init_num_files = 8, init_file_size = 24, +# resize_num_files = 64, resize_file_size = 32768, +# wait_time = 120) + + def test_empty_store_min(self): + self._resize_test(queue_name = "empty_store_min", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 1) + + def test_basic_up(self): + self._resize_test(queue_name = "basic_up", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 16, resize_file_size = 48) + + def test_basic_down(self): + self._resize_test(queue_name = "basic_down", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 15) + + def test_basic_low(self): + self._resize_test(queue_name = "basic_low", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 4, + exp_fail = True) + + def test_basic_under(self): + self._resize_test(queue_name = "basic_under", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 3, + exp_fail = True) + + def test_very_large_msg_up(self): + self._resize_test(queue_name = "very_large_msg_up", + num_msgs = 4, msg_size = 2000000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 16, resize_file_size = 48) + + def test_very_large_msg_down(self): + self._resize_test(queue_name = "very_large_msg_down", + num_msgs = 4, msg_size = 2000000, + init_num_files = 16, init_file_size = 64, + resize_num_files = 16, resize_file_size = 48) + + def test_very_large_msg_low(self): + self._resize_test(queue_name = "very_large_msg_low", + num_msgs = 4, msg_size = 2000000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 7, resize_file_size = 20, + exp_fail = True) + + def test_very_large_msg_under(self): + self._resize_test(queue_name = "very_large_msg_under", + num_msgs = 4, msg_size = 2000000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 6, resize_file_size = 8, + exp_fail = True) diff --git a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py new file mode 100644 index 0000000000..cc846aefd4 --- /dev/null +++ b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py @@ -0,0 +1,417 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import re +from brokertest import BrokerTest +from qpid.messaging import Empty +from qmf.console import Session + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + + +def store_args(store_dir = None): + """Return the broker args necessary to load the async store""" + assert BrokerTest.store_lib + if store_dir == None: + return [] + return ["--store-dir", store_dir] + +class Qmf: + """ + QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available. + """ + def __init__(self, broker): + self.__session = Session() + self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port()) + + def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None, passive=False, durable=False, + arguments = None): + """Add a new exchange""" + amqp_session = self.__broker.getAmqpSession() + if arguments == None: + arguments = {} + if alt_exchange_name: + amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, + alternate_exchange=alt_exchange_name, passive=passive, durable=durable, + arguments=arguments) + else: + amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable, + arguments=arguments) + + def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None): + """Add a new queue""" + amqp_session = self.__broker.getAmqpSession() + if arguments == None: + arguments = {} + if alt_exchange_name: + amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name, passive=passive, + durable=durable, arguments=arguments) + else: + amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments) + + def delete_queue(self, queue_name): + """Delete an existing queue""" + amqp_session = self.__broker.getAmqpSession() + amqp_session.queue_delete(queue_name) + + def _query(self, name, _class, package, alt_exchange_name=None): + """Qmf query function which can optionally look for the presence of an alternate exchange name""" + try: + obj_list = self.__session.getObjects(_class=_class, _package=package) + found = False + for obj in obj_list: + if obj.name == name: + found = True + if alt_exchange_name != None: + alt_exch_list = self.__session.getObjects(_objectId=obj.altExchange) + if len(alt_exch_list) == 0 or alt_exch_list[0].name != alt_exchange_name: + return False + break + return found + except Exception: + return False + + + def query_exchange(self, exchange_name, alt_exchange_name=None): + """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known + value.""" + return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name) + + def query_queue(self, queue_name, alt_exchange_name=None): + """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known + value.""" + return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name) + + def queue_message_count(self, queue_name): + """Query the number of messages on a queue""" + queue_list = self.__session.getObjects(_class="queue", _name=queue_name) + if len(queue_list): + return queue_list[0].msgDepth + + def queue_empty(self, queue_name): + """Check if a queue is empty (has no messages waiting)""" + return self.queue_message_count(queue_name) == 0 + + def get_objects(self, target_class, target_package="org.apache.qpid.broker"): + return self.__session.getObjects(_class=target_class, _package=target_package) + + + def close(self): + self.__session.delBroker(self.__broker) + self.__session = None + + +class StoreTest(BrokerTest): + """ + This subclass of BrokerTest adds some convenience test/check functions + """ + + def _chk_empty(self, queue, receiver): + """Check if a queue is empty (has no more messages)""" + try: + msg = receiver.fetch(timeout=0) + self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg)) + except Empty: + pass + + @staticmethod + def make_message(msg_count, msg_size): + """Make message content. Format: 'abcdef....' followed by 'msg-NNNN', where NNNN is the message count""" + msg = "msg-%04d" % msg_count + msg_len = len(msg) + buff = "" + if msg_size != None and msg_size > msg_len: + for index in range(0, msg_size - msg_len): + if index == msg_size - msg_len - 1: + buff += "-" + else: + buff += chr(ord('a') + (index % 26)) + return buff + msg + + # Functions for formatting address strings + + @staticmethod + def _fmt_csv(string_list, list_braces = None): + """Format a list using comma-separation. Braces are optionally added.""" + if len(string_list) == 0: + return "" + first = True + str_ = "" + if list_braces != None: + str_ += list_braces[0] + for string in string_list: + if string != None: + if first: + first = False + else: + str_ += ", " + str_ += string + if list_braces != None: + str_ += list_braces[1] + return str_ + + def _fmt_map(self, string_list): + """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map + element('key:val').""" + return self._fmt_csv(string_list, list_braces="{}") + + def _fmt_list(self, string_list): + """Format a list [l1, l2, l3, ...] from a string list.""" + return self._fmt_csv(string_list, list_braces="[]") + + def addr_fmt(self, node_name, **kwargs): + """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address + string.""" + # Get keyword args + node_subject = kwargs.get("node_subject") + create_policy = kwargs.get("create_policy") + delete_policy = kwargs.get("delete_policy") + assert_policy = kwargs.get("assert_policy") + mode = kwargs.get("mode") + link = kwargs.get("link", False) + link_name = kwargs.get("link_name") + node_type = kwargs.get("node_type") + durable = kwargs.get("durable", False) + link_reliability = kwargs.get("link_reliability") + x_declare_list = kwargs.get("x_declare_list", []) + x_bindings_list = kwargs.get("x_bindings_list", []) + x_subscribe_list = kwargs.get("x_subscribe_list", []) + + node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0) + link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or + len(x_bindings_list) > 0 or len(x_subscribe_list) > 0) + assert not (node_flag and link_flag) + + opt_str_list = [] + if create_policy != None: + opt_str_list.append("create: %s" % create_policy) + if delete_policy != None: + opt_str_list.append("delete: %s" % delete_policy) + if assert_policy != None: + opt_str_list.append("assert: %s" % assert_policy) + if mode != None: + opt_str_list.append("mode: %s" % mode) + if node_flag or link_flag: + node_str_list = [] + if link_name != None: + node_str_list.append("name: \"%s\"" % link_name) + if node_type != None: + node_str_list.append("type: %s" % node_type) + if durable: + node_str_list.append("durable: True") + if link_reliability != None: + node_str_list.append("reliability: %s" % link_reliability) + if len(x_declare_list) > 0: + node_str_list.append("x-declare: %s" % self._fmt_map(x_declare_list)) + if len(x_bindings_list) > 0: + node_str_list.append("x-bindings: %s" % self._fmt_list(x_bindings_list)) + if len(x_subscribe_list) > 0: + node_str_list.append("x-subscribe: %s" % self._fmt_map(x_subscribe_list)) + if node_flag: + opt_str_list.append("node: %s" % self._fmt_map(node_str_list)) + else: + opt_str_list.append("link: %s" % self._fmt_map(node_str_list)) + addr_str = node_name + if node_subject != None: + addr_str += "/%s" % node_subject + if len(opt_str_list) > 0: + addr_str += "; %s" % self._fmt_map(opt_str_list) + return addr_str + + def snd_addr(self, node_name, **kwargs): + """ Create a send (node) address""" + # Get keyword args + topic = kwargs.get("topic") + topic_flag = kwargs.get("topic_flag", False) + auto_create = kwargs.get("auto_create", True) + auto_delete = kwargs.get("auto_delete", False) + durable = kwargs.get("durable", False) + exclusive = kwargs.get("exclusive", False) + ftd_count = kwargs.get("ftd_count") + ftd_size = kwargs.get("ftd_size") + policy = kwargs.get("policy", "flow-to-disk") + exchage_type = kwargs.get("exchage_type") + + create_policy = None + if auto_create: + create_policy = "always" + delete_policy = None + if auto_delete: + delete_policy = "always" + node_type = None + if topic != None or topic_flag: + node_type = "topic" + x_declare_list = ["\"exclusive\": %s" % exclusive] + if ftd_count != None or ftd_size != None: + queue_policy = ["\'qpid.policy_type\': %s" % policy] + if ftd_count: + queue_policy.append("\'qpid.max_count\': %d" % ftd_count) + if ftd_size: + queue_policy.append("\'qpid.max_size\': %d" % ftd_size) + x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) + if exchage_type != None: + x_declare_list.append("type: %s" % exchage_type) + + return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy, + node_type=node_type, durable=durable, x_declare_list=x_declare_list) + + def rcv_addr(self, node_name, **kwargs): + """ Create a receive (link) address""" + # Get keyword args + auto_create = kwargs.get("auto_create", True) + auto_delete = kwargs.get("auto_delete", False) + link_name = kwargs.get("link_name") + durable = kwargs.get("durable", False) + browse = kwargs.get("browse", False) + exclusive = kwargs.get("exclusive", False) + binding_list = kwargs.get("binding_list", []) + ftd_count = kwargs.get("ftd_count") + ftd_size = kwargs.get("ftd_size") + policy = kwargs.get("policy", "flow-to-disk") + + create_policy = None + if auto_create: + create_policy = "always" + delete_policy = None + if auto_delete: + delete_policy = "always" + mode = None + if browse: + mode = "browse" + x_declare_list = ["\"exclusive\": %s" % exclusive] + if ftd_count != None or ftd_size != None: + queue_policy = ["\'qpid.policy_type\': %s" % policy] + if ftd_count: + queue_policy.append("\'qpid.max_count\': %d" % ftd_count) + if ftd_size: + queue_policy.append("\'qpid.max_size\': %d" % ftd_size) + x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) + x_bindings_list = [] + for binding in binding_list: + x_bindings_list.append("{exchange: %s, key: %s}" % binding) + if durable: reliability = 'at-least-once' + else: reliability = None + return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True, + link_name=link_name, durable=durable, x_declare_list=x_declare_list, + x_bindings_list=x_bindings_list, link_reliability=reliability) + + def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False): + """Check that a message is on a queue by dequeuing it and comparing it to the expected message""" + return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse) + + def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False, + emtpy_flag=False): + """Check that messages is on a queue by dequeuing them and comparing them to the expected messages""" + if emtpy_flag: + num_msgs = 0 + else: + num_msgs = len(exp_msg_list) + ssn = broker.connect().session(transactional=transactional) + rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs) + if num_msgs > 0: + try: + recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)] + except Empty: + self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i)) + for i in range(0, len(recieved_msg_list)): + self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content) + self.assertEqual(recieved_msg_list[i].correlation_id, exp_msg_list[i].correlation_id) + if empty: + self._chk_empty(queue, rcvr) + if ack: + ssn.acknowledge() + if transactional: + ssn.commit() + ssn.connection.close() + else: + if transactional: + ssn.commit() + return ssn + + + # Functions for finding strings in the broker log file (or other files) + + @staticmethod + def _read_file(file_name): + """Returns the content of file named file_name as a string""" + file_handle = file(file_name) + try: + return file_handle.read() + finally: + file_handle.close() + + def _get_hits(self, broker, search): + """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple + queues)""" + # TODO: Use sets when RHEL-4 is no longer supported + hits = [] + for hit in search.findall(self._read_file(broker.log)): + if hit not in hits: + hits.append(hit) + return hits + + def _reconsile_hits(self, broker, ftd_msgs, release_hits): + """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining + release_hits.""" + for msg in ftd_msgs: + found = False + for hit in release_hits: + if str(msg.id) in hit: + release_hits.remove(hit) + #print "Found %s in %s" % (msg.id, broker.log) + found = True + break + if not found: + self.assert_(False, "Unable to locate released message %s in log %s" % (msg.id, broker.log)) + if len(release_hits) > 0: + err = "Messages were unexpectedly released in log %s:\n" % broker.log + for hit in release_hits: + err += " %s\n" % hit + self.assert_(False, err) + + def check_msg_release(self, broker, ftd_msgs): + """ Check for 'Content released' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_release_on_commit(self, broker, ftd_msgs): + """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released on commit$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_release_on_recover(self, broker, ftd_msgs): + """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released after recovery$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_block(self, broker, ftd_msgs): + """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content release blocked$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_block_on_commit(self, broker, ftd_msgs): + """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content release blocked on commit$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) |