diff options
Diffstat (limited to 'qpid/cpp/src/tests/linearstore')
-rw-r--r-- | qpid/cpp/src/tests/linearstore/CMakeLists.txt | 29 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh | 55 | ||||
-rw-r--r-- | qpid/cpp/src/tests/linearstore/python_tests/__init__.py | 23 | ||||
-rw-r--r-- | qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py | 239 | ||||
-rw-r--r-- | qpid/cpp/src/tests/linearstore/python_tests/store_test.py | 417 | ||||
-rw-r--r-- | qpid/cpp/src/tests/linearstore/run_long_python_tests | 21 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/linearstore/run_python_tests | 42 | ||||
-rw-r--r-- | qpid/cpp/src/tests/linearstore/run_short_python_tests | 21 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/linearstore/tx-test-soak.sh | 275 |
9 files changed, 1122 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/linearstore/CMakeLists.txt b/qpid/cpp/src/tests/linearstore/CMakeLists.txt new file mode 100644 index 0000000000..bf6c164818 --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/CMakeLists.txt @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if(BUILD_LINEARSTORE AND BUILD_TESTING) + +message(STATUS "Building linearstore tests") + +set(test_wrap ${shell} ${CMAKE_SOURCE_DIR}/src/tests/run_test${test_script_suffix} -buildDir=${CMAKE_BINARY_DIR}) + +add_test (linearstore_python_tests ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/run_python_tests${test_script_suffix}) + +endif (BUILD_LINEARSTORE AND BUILD_TESTING) + diff --git a/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh new file mode 100755 index 0000000000..ef39767e9b --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This script sets up a test directory which contains both +# recoverable and non-recoverable files and directories for +# the empty file pool (EFP). + +# NOTE: The following is based on typical development tree paths, not installed paths + +BASE_DIR=${HOME}/RedHat +STORE_DIR=${BASE_DIR} +PYTHON_TOOLS_DIR=${BASE_DIR}/qpid/tools/src/linearstore +export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py + +# Remove old dirs (if present) +rm -rf ${STORE_DIR}/qls +rm -rf ${STORE_DIR}/p002 +rm ${STORE_DIR}/p004 + +# Create new dir tree and links +mkdir ${STORE_DIR}/p002_ext +touch ${STORE_DIR}/p004_ext +mkdir ${STORE_DIR}/qls +mkdir ${STORE_DIR}/qls/p001 +touch ${STORE_DIR}/qls/p003 +ln -s ${STORE_DIR}/p002_ext ${STORE_DIR}/qls/p002 +ln -s ${STORE_DIR}/p004_ext ${STORE_DIR}/qls/p004 + +# Populate efp dirs with empty files +${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25 +${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25 +${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25 + +# Show the result for information +${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -l +tree -la $STORE_DIR/qls + diff --git a/qpid/cpp/src/tests/linearstore/python_tests/__init__.py b/qpid/cpp/src/tests/linearstore/python_tests/__init__.py new file mode 100644 index 0000000000..1e59d403e4 --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/python_tests/__init__.py @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Do not delete - marks this directory as a python package. + +from client_persistence import * + diff --git a/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py b/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py new file mode 100644 index 0000000000..9ff9480c4c --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py @@ -0,0 +1,239 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os + +from brokertest import EXPECT_EXIT_OK +from store_test import StoreTest, Qmf, store_args +from qpid.messaging import * + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client. + +class ExchangeQueueTests(StoreTest): + """ + Simple tests of the broker exchange and queue types + """ + + def test_direct_exchange(self): + """Test Direct exchange.""" + broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK) + msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001") + msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002") + broker.send_message("a", msg1) + broker.send_message("b", msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_direct_exchange") + self.check_message(broker, "a", msg1, True) + self.check_message(broker, "b", msg2, True) + + def test_topic_exchange(self): + """Test Topic exchange.""" + broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}") + snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}") + ssn.receiver("a; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}") + ssn.receiver("b; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}") + ssn.receiver("c; {create:always, link:{x-bindings:[{exchange:abc, key:key1}, " + "{exchange:abc, key: key2}]}, node:{durable:True}}") + ssn.receiver("d; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}") + ssn.receiver("e; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}") + msg1 = Message("Message1", durable=True, correlation_id="Msg0003") + snd1.send(msg1) + msg2 = Message("Message2", durable=True, correlation_id="Msg0004") + snd2.send(msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_topic_exchange") + self.check_message(broker, "a", msg1, True) + self.check_message(broker, "b", msg1, True) + self.check_messages(broker, "c", [msg1, msg2], True) + self.check_message(broker, "d", msg2, True) + self.check_message(broker, "e", msg2, True) + + + def test_legacy_lvq(self): + """Test legacy LVQ.""" + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) + ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"}) + ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"}) + mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"}) + mb2 = Message("B2", durable=True, correlation_id="Msg0008", properties={"qpid.LVQ_key":"B"}) + mb3 = Message("B3", durable=True, correlation_id="Msg0009", properties={"qpid.LVQ_key":"B"}) + mc1 = Message("C1", durable=True, correlation_id="Msg0010", properties={"qpid.LVQ_key":"C"}) + broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1], + xprops="arguments:{\"qpid.last_value_queue\":True}") + broker.terminate() + + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) + ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False) + # Add more messages while subscriber is active (no replacement): + ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"}) + ma4 = Message("A4", durable=True, correlation_id="Msg0012", properties={"qpid.LVQ_key":"A"}) + mc2 = Message("C2", durable=True, correlation_id="Msg0013", properties={"qpid.LVQ_key":"C"}) + mc3 = Message("C3", durable=True, correlation_id="Msg0014", properties={"qpid.LVQ_key":"C"}) + mc4 = Message("C4", durable=True, correlation_id="Msg0015", properties={"qpid.LVQ_key":"C"}) + broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn) + ssn.acknowledge() + broker.terminate() + + broker = self.broker(store_args(), name="test_lvq") + self.check_messages(broker, "lvq-test", [ma4, mc4], True) + + + def test_fanout_exchange(self): + """Test Fanout Exchange""" + broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}") + msg1 = Message("Msg1", durable=True, correlation_id="Msg0001") + snd.send(msg1) + msg2 = Message("Msg2", durable=True, correlation_id="Msg0002") + snd.send(msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_fanout_exchange") + self.check_messages(broker, "q1", [msg1, msg2], True) + self.check_messages(broker, "q2", [msg1, msg2], True) + self.check_messages(broker, "q3", [msg1, msg2], True) + + + def test_message_reject(self): + broker = self.broker(store_args(), name="test_message_reject", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd = ssn.sender("tmr; {create:always, node:{type:queue, durable:True}}") + rcv = ssn.receiver("tmr; {create:always, node:{type:queue, durable:True}}") + m1 = Message("test_message_reject", durable=True, correlation_id="Msg0001") + snd.send(m1) + m2 = rcv.fetch() + ssn.acknowledge(message=m2, disposition=Disposition(REJECTED)) + broker.terminate() + + broker = self.broker(store_args(), name="test_message_reject") + qmf = Qmf(broker) + assert qmf.queue_message_count("tmr") == 0 + + + def test_route(self): + """ Test the recovery of a route (link and bridge objects.""" + broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf_broker_obj = qmf.get_objects("broker")[0] + + # create a "link" + link_args = {"host":"a.fake.host.com", "port":9999, "durable":True, + "authMechanism":"PLAIN", "username":"guest", "password":"guest", + "transport":"tcp"} + result = qmf_broker_obj.create("link", "test-link", link_args, False) + self.assertEqual(result.status, 0, result) + link = qmf.get_objects("link")[0] + + # create bridge + bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key", "durable":True} + result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False); + self.assertEqual(result.status, 0, result) + bridge = qmf.get_objects("bridge")[0] + + broker.terminate() + + # recover the link and bridge + broker = self.broker(store_args(), name="test_route") + qmf = Qmf(broker) + qmf_broker_obj = qmf.get_objects("broker")[0] + self.assertEqual(len(qmf.get_objects("link")), 1) + self.assertEqual(len(qmf.get_objects("bridge")), 1) + + + +class AlternateExchangePropertyTests(StoreTest): + """ + Test the persistence of the Alternate Exchange property for exchanges and queues. + """ + + def test_exchange(self): + """Exchange alternate exchange property persistence test""" + broker = self.broker(store_args(), name="test_exchange", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance + qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch") + qmf.close() + broker.terminate() + + broker = self.broker(store_args(), name="test_exchange") + qmf = Qmf(broker) + try: + qmf.add_exchange("altExch", "direct", passive=True) + except Exception, error: + self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error) + try: + qmf.add_exchange("testExch", "direct", passive=True) + except Exception, error: + self.fail("Test exchange (\"testExch\") instance not recovered: %s" % error) + self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"), + "Alternate exchange property not found or is incorrect on exchange \"testExch\".") + qmf.close() + + def test_queue(self): + """Queue alternate exchange property persistexchangeNamece test""" + broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance + qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch") + qmf.close() + broker.terminate() + + broker = self.broker(store_args(), name="test_queue") + qmf = Qmf(broker) + try: + qmf.add_exchange("altExch", "direct", passive=True) + except Exception, error: + self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error) + try: + qmf.add_queue("testQueue", passive=True) + except Exception, error: + self.fail("Test queue (\"testQueue\") instance not recovered: %s" % error) + self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = "altExch"), + "Alternate exchange property not found or is incorrect on queue \"testQueue\".") + qmf.close() + + +class RedeliveredTests(StoreTest): + """ + Test the behavior of the redelivered flag in the context of persistence + """ + + def test_broker_recovery(self): + """Test that the redelivered flag is set on messages after recovery of broker""" + broker = self.broker(store_args(), name="test_broker_recovery", expect=EXPECT_EXIT_OK) + msg_content = "xyz"*100 + msg = Message(msg_content, durable=True) + broker.send_message("testQueue", msg) + broker.terminate() + + broker = self.broker(store_args(), name="test_broker_recovery") + rcv_msg = broker.get_message("testQueue") + self.assertEqual(msg_content, rcv_msg.content) + self.assertTrue(rcv_msg.redelivered) + diff --git a/qpid/cpp/src/tests/linearstore/python_tests/store_test.py b/qpid/cpp/src/tests/linearstore/python_tests/store_test.py new file mode 100644 index 0000000000..cc846aefd4 --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/python_tests/store_test.py @@ -0,0 +1,417 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import re +from brokertest import BrokerTest +from qpid.messaging import Empty +from qmf.console import Session + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + + +def store_args(store_dir = None): + """Return the broker args necessary to load the async store""" + assert BrokerTest.store_lib + if store_dir == None: + return [] + return ["--store-dir", store_dir] + +class Qmf: + """ + QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available. + """ + def __init__(self, broker): + self.__session = Session() + self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port()) + + def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None, passive=False, durable=False, + arguments = None): + """Add a new exchange""" + amqp_session = self.__broker.getAmqpSession() + if arguments == None: + arguments = {} + if alt_exchange_name: + amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, + alternate_exchange=alt_exchange_name, passive=passive, durable=durable, + arguments=arguments) + else: + amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable, + arguments=arguments) + + def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None): + """Add a new queue""" + amqp_session = self.__broker.getAmqpSession() + if arguments == None: + arguments = {} + if alt_exchange_name: + amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name, passive=passive, + durable=durable, arguments=arguments) + else: + amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments) + + def delete_queue(self, queue_name): + """Delete an existing queue""" + amqp_session = self.__broker.getAmqpSession() + amqp_session.queue_delete(queue_name) + + def _query(self, name, _class, package, alt_exchange_name=None): + """Qmf query function which can optionally look for the presence of an alternate exchange name""" + try: + obj_list = self.__session.getObjects(_class=_class, _package=package) + found = False + for obj in obj_list: + if obj.name == name: + found = True + if alt_exchange_name != None: + alt_exch_list = self.__session.getObjects(_objectId=obj.altExchange) + if len(alt_exch_list) == 0 or alt_exch_list[0].name != alt_exchange_name: + return False + break + return found + except Exception: + return False + + + def query_exchange(self, exchange_name, alt_exchange_name=None): + """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known + value.""" + return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name) + + def query_queue(self, queue_name, alt_exchange_name=None): + """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known + value.""" + return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name) + + def queue_message_count(self, queue_name): + """Query the number of messages on a queue""" + queue_list = self.__session.getObjects(_class="queue", _name=queue_name) + if len(queue_list): + return queue_list[0].msgDepth + + def queue_empty(self, queue_name): + """Check if a queue is empty (has no messages waiting)""" + return self.queue_message_count(queue_name) == 0 + + def get_objects(self, target_class, target_package="org.apache.qpid.broker"): + return self.__session.getObjects(_class=target_class, _package=target_package) + + + def close(self): + self.__session.delBroker(self.__broker) + self.__session = None + + +class StoreTest(BrokerTest): + """ + This subclass of BrokerTest adds some convenience test/check functions + """ + + def _chk_empty(self, queue, receiver): + """Check if a queue is empty (has no more messages)""" + try: + msg = receiver.fetch(timeout=0) + self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg)) + except Empty: + pass + + @staticmethod + def make_message(msg_count, msg_size): + """Make message content. Format: 'abcdef....' followed by 'msg-NNNN', where NNNN is the message count""" + msg = "msg-%04d" % msg_count + msg_len = len(msg) + buff = "" + if msg_size != None and msg_size > msg_len: + for index in range(0, msg_size - msg_len): + if index == msg_size - msg_len - 1: + buff += "-" + else: + buff += chr(ord('a') + (index % 26)) + return buff + msg + + # Functions for formatting address strings + + @staticmethod + def _fmt_csv(string_list, list_braces = None): + """Format a list using comma-separation. Braces are optionally added.""" + if len(string_list) == 0: + return "" + first = True + str_ = "" + if list_braces != None: + str_ += list_braces[0] + for string in string_list: + if string != None: + if first: + first = False + else: + str_ += ", " + str_ += string + if list_braces != None: + str_ += list_braces[1] + return str_ + + def _fmt_map(self, string_list): + """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map + element('key:val').""" + return self._fmt_csv(string_list, list_braces="{}") + + def _fmt_list(self, string_list): + """Format a list [l1, l2, l3, ...] from a string list.""" + return self._fmt_csv(string_list, list_braces="[]") + + def addr_fmt(self, node_name, **kwargs): + """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address + string.""" + # Get keyword args + node_subject = kwargs.get("node_subject") + create_policy = kwargs.get("create_policy") + delete_policy = kwargs.get("delete_policy") + assert_policy = kwargs.get("assert_policy") + mode = kwargs.get("mode") + link = kwargs.get("link", False) + link_name = kwargs.get("link_name") + node_type = kwargs.get("node_type") + durable = kwargs.get("durable", False) + link_reliability = kwargs.get("link_reliability") + x_declare_list = kwargs.get("x_declare_list", []) + x_bindings_list = kwargs.get("x_bindings_list", []) + x_subscribe_list = kwargs.get("x_subscribe_list", []) + + node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0) + link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or + len(x_bindings_list) > 0 or len(x_subscribe_list) > 0) + assert not (node_flag and link_flag) + + opt_str_list = [] + if create_policy != None: + opt_str_list.append("create: %s" % create_policy) + if delete_policy != None: + opt_str_list.append("delete: %s" % delete_policy) + if assert_policy != None: + opt_str_list.append("assert: %s" % assert_policy) + if mode != None: + opt_str_list.append("mode: %s" % mode) + if node_flag or link_flag: + node_str_list = [] + if link_name != None: + node_str_list.append("name: \"%s\"" % link_name) + if node_type != None: + node_str_list.append("type: %s" % node_type) + if durable: + node_str_list.append("durable: True") + if link_reliability != None: + node_str_list.append("reliability: %s" % link_reliability) + if len(x_declare_list) > 0: + node_str_list.append("x-declare: %s" % self._fmt_map(x_declare_list)) + if len(x_bindings_list) > 0: + node_str_list.append("x-bindings: %s" % self._fmt_list(x_bindings_list)) + if len(x_subscribe_list) > 0: + node_str_list.append("x-subscribe: %s" % self._fmt_map(x_subscribe_list)) + if node_flag: + opt_str_list.append("node: %s" % self._fmt_map(node_str_list)) + else: + opt_str_list.append("link: %s" % self._fmt_map(node_str_list)) + addr_str = node_name + if node_subject != None: + addr_str += "/%s" % node_subject + if len(opt_str_list) > 0: + addr_str += "; %s" % self._fmt_map(opt_str_list) + return addr_str + + def snd_addr(self, node_name, **kwargs): + """ Create a send (node) address""" + # Get keyword args + topic = kwargs.get("topic") + topic_flag = kwargs.get("topic_flag", False) + auto_create = kwargs.get("auto_create", True) + auto_delete = kwargs.get("auto_delete", False) + durable = kwargs.get("durable", False) + exclusive = kwargs.get("exclusive", False) + ftd_count = kwargs.get("ftd_count") + ftd_size = kwargs.get("ftd_size") + policy = kwargs.get("policy", "flow-to-disk") + exchage_type = kwargs.get("exchage_type") + + create_policy = None + if auto_create: + create_policy = "always" + delete_policy = None + if auto_delete: + delete_policy = "always" + node_type = None + if topic != None or topic_flag: + node_type = "topic" + x_declare_list = ["\"exclusive\": %s" % exclusive] + if ftd_count != None or ftd_size != None: + queue_policy = ["\'qpid.policy_type\': %s" % policy] + if ftd_count: + queue_policy.append("\'qpid.max_count\': %d" % ftd_count) + if ftd_size: + queue_policy.append("\'qpid.max_size\': %d" % ftd_size) + x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) + if exchage_type != None: + x_declare_list.append("type: %s" % exchage_type) + + return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy, + node_type=node_type, durable=durable, x_declare_list=x_declare_list) + + def rcv_addr(self, node_name, **kwargs): + """ Create a receive (link) address""" + # Get keyword args + auto_create = kwargs.get("auto_create", True) + auto_delete = kwargs.get("auto_delete", False) + link_name = kwargs.get("link_name") + durable = kwargs.get("durable", False) + browse = kwargs.get("browse", False) + exclusive = kwargs.get("exclusive", False) + binding_list = kwargs.get("binding_list", []) + ftd_count = kwargs.get("ftd_count") + ftd_size = kwargs.get("ftd_size") + policy = kwargs.get("policy", "flow-to-disk") + + create_policy = None + if auto_create: + create_policy = "always" + delete_policy = None + if auto_delete: + delete_policy = "always" + mode = None + if browse: + mode = "browse" + x_declare_list = ["\"exclusive\": %s" % exclusive] + if ftd_count != None or ftd_size != None: + queue_policy = ["\'qpid.policy_type\': %s" % policy] + if ftd_count: + queue_policy.append("\'qpid.max_count\': %d" % ftd_count) + if ftd_size: + queue_policy.append("\'qpid.max_size\': %d" % ftd_size) + x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) + x_bindings_list = [] + for binding in binding_list: + x_bindings_list.append("{exchange: %s, key: %s}" % binding) + if durable: reliability = 'at-least-once' + else: reliability = None + return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True, + link_name=link_name, durable=durable, x_declare_list=x_declare_list, + x_bindings_list=x_bindings_list, link_reliability=reliability) + + def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False): + """Check that a message is on a queue by dequeuing it and comparing it to the expected message""" + return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse) + + def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False, + emtpy_flag=False): + """Check that messages is on a queue by dequeuing them and comparing them to the expected messages""" + if emtpy_flag: + num_msgs = 0 + else: + num_msgs = len(exp_msg_list) + ssn = broker.connect().session(transactional=transactional) + rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs) + if num_msgs > 0: + try: + recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)] + except Empty: + self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i)) + for i in range(0, len(recieved_msg_list)): + self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content) + self.assertEqual(recieved_msg_list[i].correlation_id, exp_msg_list[i].correlation_id) + if empty: + self._chk_empty(queue, rcvr) + if ack: + ssn.acknowledge() + if transactional: + ssn.commit() + ssn.connection.close() + else: + if transactional: + ssn.commit() + return ssn + + + # Functions for finding strings in the broker log file (or other files) + + @staticmethod + def _read_file(file_name): + """Returns the content of file named file_name as a string""" + file_handle = file(file_name) + try: + return file_handle.read() + finally: + file_handle.close() + + def _get_hits(self, broker, search): + """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple + queues)""" + # TODO: Use sets when RHEL-4 is no longer supported + hits = [] + for hit in search.findall(self._read_file(broker.log)): + if hit not in hits: + hits.append(hit) + return hits + + def _reconsile_hits(self, broker, ftd_msgs, release_hits): + """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining + release_hits.""" + for msg in ftd_msgs: + found = False + for hit in release_hits: + if str(msg.id) in hit: + release_hits.remove(hit) + #print "Found %s in %s" % (msg.id, broker.log) + found = True + break + if not found: + self.assert_(False, "Unable to locate released message %s in log %s" % (msg.id, broker.log)) + if len(release_hits) > 0: + err = "Messages were unexpectedly released in log %s:\n" % broker.log + for hit in release_hits: + err += " %s\n" % hit + self.assert_(False, err) + + def check_msg_release(self, broker, ftd_msgs): + """ Check for 'Content released' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_release_on_commit(self, broker, ftd_msgs): + """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released on commit$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_release_on_recover(self, broker, ftd_msgs): + """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released after recovery$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_block(self, broker, ftd_msgs): + """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content release blocked$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_block_on_commit(self, broker, ftd_msgs): + """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content release blocked on commit$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) diff --git a/qpid/cpp/src/tests/linearstore/run_long_python_tests b/qpid/cpp/src/tests/linearstore/run_long_python_tests new file mode 100644 index 0000000000..be6380302c --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/run_long_python_tests @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +./run_python_tests LONG_TEST diff --git a/qpid/cpp/src/tests/linearstore/run_python_tests b/qpid/cpp/src/tests/linearstore/run_python_tests new file mode 100755 index 0000000000..4ff212a71c --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/run_python_tests @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +source ${QPID_TEST_COMMON} + +ensure_python_tests + +#Add our directory to the python path +export PYTHONPATH=$srcdir/linearstore:${PYTHONPATH} + +MODULENAME=python_tests + +echo "Running Python tests in module ${MODULENAME}..." + +QPID_PORT=${QPID_PORT:-5672} +FAILING=${FAILING:-/dev/null} +PYTHON_TESTS=${PYTHON_TESTS:-$*} + +OUTDIR=${MODULENAME}.tmp +rm -rf ${OUTDIR} + +# To debug a test, add the following options to the end of the following line: +# -v DEBUG -c qpid.messaging.io.ops [*.testName] +${QPID_PYTHON_TEST} -m ${MODULENAME} -I ${FAILING} -DOUTDIR=${OUTDIR} ${PYTHON_TEST} || exit 1 + diff --git a/qpid/cpp/src/tests/linearstore/run_short_python_tests b/qpid/cpp/src/tests/linearstore/run_short_python_tests new file mode 100644 index 0000000000..9b9e7c59be --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/run_short_python_tests @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +./run_python_tests SHORT_TEST diff --git a/qpid/cpp/src/tests/linearstore/tx-test-soak.sh b/qpid/cpp/src/tests/linearstore/tx-test-soak.sh new file mode 100755 index 0000000000..7d5581961f --- /dev/null +++ b/qpid/cpp/src/tests/linearstore/tx-test-soak.sh @@ -0,0 +1,275 @@ +#! /bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# tx-test-soak +# +# Basic test methodology: +# 1. Start broker +# 2. Run qpid-txtest against broker using randomly generated parameters +# 3. After some time, kill the broker using SIGKILL +# 4. Restart broker, recover messages +# 5. Run qpid-txtest against broker in check mode, which checks that all expected messages are present. +# 6. Wash, rinse, repeat... The number of runs is determined by ${NUM_RUNS} + +# NOTE: The following is based on typical development tree paths, not installed paths + +NUM_RUNS=1000 +BASE_DIR=${HOME}/RedHat +CMAKE_BUILD_DIR=${BASE_DIR}/q.cm + +# Infrequently adjusted +RESULT_BASE_DIR_PREFIX=${BASE_DIR}/results.tx-test-soak +RECOVER_TIME_PER_QUEUE=1 +STORE_MODULE="linearstore.so" +BROKER_LOG_LEVEL="info+" +BROKER_MANAGEMENT="no" # "no" or "yes" +TRUNCATE_INTERVAL=10 +MAX_DISK_PERC_USED=90 + +# Constants (don't adjust these) +export BASE_DIR +RELATIVE_BASE_DIR=`python -c "import os,os.path; print os.path.relpath(os.environ['BASE_DIR'], os.environ['PWD'])"` +export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py +LOG_FILE_NAME=log.txt +QPIDD_FN=qpidd +QPIDD=${CMAKE_BUILD_DIR}/src/${QPIDD_FN} +TXTEST_FN=qpid-txtest +TXTEST=${CMAKE_BUILD_DIR}/src/tests/${TXTEST_FN} +ANALYZE_FN=qpid_qls_analyze.py +ANALYZE=${BASE_DIR}/qpid/tools/src/py/${ANALYZE_FN} +ANALYZE_ARGS="--efp --show-recs --stats" +QPIDD_BASE_ARGS="--load-module ${STORE_MODULE} -m ${BROKER_MANAGEMENT} --auth no --default-flow-stop-threshold 0 --default-flow-resume-threshold 0 --default-queue-limit 0 --store-dir ${BASE_DIR} --log-enable ${BROKER_LOG_LEVEL} --log-to-stderr no --log-to-stdout no" +TXTEST_INIT_STR="--init yes --transfer no --check no" +TXTEST_RUN_STR="--init no --transfer yes --check no" +TXTEST_CHK_STR="--init no --transfer no --check yes" +SUCCESS_MSG="All expected messages were retrieved." +TIMESTAMP_FORMAT="+%Y-%m-%d_%H:%M:%S" +ANSI_RED="\e[1;31m" +ANSI_NONE="\e[0m" +DEFAULT_EFP_DIR=2048k +DEFAULT_EFP_SIZE=2101248 +SIG_KILL=-9 +SIG_TERM=-15 + +# Creates a random number into the variable named in string $1 in the range [$2..$3] (both inclusive). +# $1: variable name as string to which random value is assigned +# $2: minimum inclusive range of random number +# $3: maximum inclusive range of random number +get_random() { + eval $1=`python -S -c "import random; print random.randint($2,$3)"` +} + +# Uses anon-uniform distribution to set a random message size. +# Most messages must be small (0 - 1k), but we need a few medium (10k) and large (100k) ones also. +# Sets message size into var ${MSG_SIZE} +set_message_size() { + local key=0 + get_random "key" 1 10 + if (( "${key}" == "10" )); then # 1 out of 10 - very large + get_random "MSG_SIZE" 100000 1000000 + FILE_SIZE_MULTIPLIER=3 + elif (( "${key}" >= "8" )); then # 2 out of 10 - large + get_random "MSG_SIZE" 10000 100000 + FILE_SIZE_MULTIPLIER=2 + elif (( "${key}" >= "6" )); then # 2 out of 10 - medium + get_random "MSG_SIZE" 1000 10000 + FILE_SIZE_MULTIPLIER=1 + else # 5 out of 10 - small + get_random "MSG_SIZE" 10 1000 + FILE_SIZE_MULTIPLIER=1 + fi +} + +# Start or restart broker +# $1: Log suffix: either "A" or "B". If "A", broker is started with truncation, otherwise broker is restarted with recovery. +# $2: Truncate flag - only used if Log suffix is "A": if true, then truncate store +# The PID of the broker is returned in ${QPIDD_PID} +start_broker() { + local truncate_val + local truncate_str + if [[ "$1" == "A" ]]; then + if [[ $2 == true ]]; then + truncate_val="yes" + truncate_str="(Store truncated)" + if [[ -e ${BASE_DIR}/qls/p001/efp/${DEFAULT_EFP_DIR} ]]; then + for f in ${BASE_DIR}/qls/p001/efp/${DEFAULT_EFP_DIR}/*; do + local filesize=`stat -c%s "${f}"` + if (( ${filesize} != ${DEFAULT_EFP_SIZE} )); then + rm ${f} + fi + done + fi + else + truncate_val="no" + fi + else + truncate_val="no" + fi + echo "${QPIDD} ${QPIDD_BASE_ARGS} --truncate ${truncate_val} --log-to-file ${RESULT_DIR}/qpidd.$1.log &" > ${RESULT_DIR}/qpidd.$1.cmd + ${QPIDD} ${QPIDD_BASE_ARGS} --truncate ${truncate_val} --log-to-file ${RESULT_DIR}/qpidd.$1.log & + QPIDD_PID=$! + echo "Broker PID=${QPIDD_PID} ${truncate_str}" | tee -a ${LOG_FILE} +} + +# Start or evaluate results of transaction test client +# $1: Log suffix flag: either "A" or "B". If "A", client is started in test mode, otherwise client evaluates recovery. +start_tx_test() { + local tx_test_params="--messages-per-tx ${MSGS_PER_TX} --tx-count 1000000 --total-messages ${TOT_MSGS} --size ${MSG_SIZE} --queues ${NUM_QUEUES}" + if [[ "$1" == "A" ]]; then + # Run in background + echo "${TXTEST##*/} parameters: ${tx_test_params}" | tee -a ${LOG_FILE} + echo "${TXTEST} ${tx_test_params} ${TXTEST_INIT_STR} &> ${RESULT_DIR}/txtest.$1.log" > ${RESULT_DIR}/txtest.$1.cmd + ${TXTEST} ${tx_test_params} ${TXTEST_INIT_STR} &> ${RESULT_DIR}/txtest.$1.log + echo "${TXTEST} ${tx_test_params} ${TXTEST_RUN_STR} &> ${RESULT_DIR}/txtest.$1.log &" >> ${RESULT_DIR}/txtest.$1.cmd + ${TXTEST} ${tx_test_params} ${TXTEST_RUN_STR} &> ${RESULT_DIR}/txtest.$1.log & + else + # Run in foreground + #echo "${TXTEST##*/} ${tx_test_params} ${TXTEST_CHK_STR}" | tee -a ${LOG_FILE} + echo "${TXTEST} ${tx_test_params} ${TXTEST_CHK_STR} &> ${RESULT_DIR}/txtest.$1.log" > ${RESULT_DIR}/txtest.$1.cmd + ${TXTEST} ${tx_test_params} ${TXTEST_CHK_STR} &> ${RESULT_DIR}/txtest.$1.log + fi +} + +# Search for the presence of core.* files, move them into the current result directory and run gdb against them. +# No params +process_core_files() { + ls core.* &> /dev/null + if (( "$?" == "0" )); then + for cf in core.*; do + gdb --batch --quiet -ex "thread apply all bt" -ex "quit" ${QPIDD} ${cf} &> ${RESULT_DIR}/${cf##*/}.gdb.txt + gdb --batch --quiet -ex "thread apply all bt full" -ex "quit" ${QPIDD} ${cf} &> ${RESULT_DIR}/${cf##*/}.gdb-full.txt + cat ${RESULT_DIR}/${cf##*/}.gdb.txt + mv ${cf} ${RESULT_DIR}/ + echo "Core file ${cf##*/} found and recovered" + done + fi +} + +# Kill a process quietly +# $1: Signal +# $2: PID +kill_process() { + kill ${1} ${2} &>> ${LOG_FILE} + wait ${2} &>> ${LOG_FILE} +} + +# Check that test can run: No other copy of qpidd running, enough disk space +check_ready_to_run() { + # Check no copy of qpidd is running + PID=`pgrep ${QPIDD_FN}` + if [[ "$?" == "0" ]]; then + echo "ERROR: qpidd running as pid ${PID}" + exit 1 + fi + # Check disk is < 90% full + local perc_full=`df -h ${HOME} | tail -1 | awk '{print substr($5,0, length($5)-1)}'` + if (( ${perc_full} >= ${MAX_DISK_PERC_USED} )); then + echo "ERROR: Disk is too close to full (${perc_full}%)" + exit 2 + fi +} + +# Analyze store files +# $1: Log suffix flag: either "A" or "B". If "A", client is started in test mode, otherwise client evaluates recovery. +analyze_store() { + ${ANALYZE} ${ANALYZE_ARGS} ${BASE_DIR}/qls &> ${RESULT_DIR}/qls_analysis.$1.log + echo >> ${RESULT_DIR}/qls_analysis.$1.log + echo "----------------------------------------------------------" >> ${RESULT_DIR}/qls_analysis.$1.log + echo "With transactional reconsiliation:" >> ${RESULT_DIR}/qls_analysis.$1.log + echo >> ${RESULT_DIR}/qls_analysis.$1.log + ${ANALYZE} ${ANALYZE_ARGS} --txn ${BASE_DIR}/qls &>> ${RESULT_DIR}/qls_analysis.$1.log +} + +ulimit -c unlimited # Allow core files to be created + +RESULT_BASE_DIR_SUFFIX=`date "${TIMESTAMP_FORMAT}"` +RESULT_BASE_DIR="${RESULT_BASE_DIR_PREFIX}.${RESULT_BASE_DIR_SUFFIX}" +LOG_FILE=${RESULT_BASE_DIR}/${LOG_FILE_NAME} +if [[ -n "${RESULT_BASE_DIR}" ]]; then + rm -rf ${RESULT_BASE_DIR} +fi + +mkdir -p ${RESULT_BASE_DIR} +for rn in `seq ${NUM_RUNS}`; do + # === Prepare result dir, check ready to run test, set run vars === + RESULT_DIR=${RESULT_BASE_DIR}/run_${rn} + mkdir -p ${RESULT_DIR} + check_ready_to_run + if (( (${rn} - 1) % ${TRUNCATE_INTERVAL} == 0 )) || [[ -n ${ERROR_FLAG} ]]; then + TRUNCATE_FLAG=true + else + TRUNCATE_FLAG=false + fi + set_message_size + get_random "MSGS_PER_TX" 1 20 + get_random "TOT_MSGS" 100 1000 + get_random "NUM_QUEUES" 2 15 + MIN_RUNTIME=$(( 20 * ${FILE_SIZE_MULTIPLIER} )) + MAX_RUNTIME=$(( 120 * ${FILE_SIZE_MULTIPLIER} )) + get_random "RUN_TIME" ${MIN_RUNTIME} ${MAX_RUNTIME} + RECOVER_TIME=$(( ${NUM_QUEUES} * ${RECOVER_TIME_PER_QUEUE} * ${FILE_SIZE_MULTIPLIER} )) + echo "Run ${rn} of ${NUM_RUNS} ==============" | tee -a ${LOG_FILE} + + # === PART A: Initial run of qpid-txtest === + start_broker "A" ${TRUNCATE_FLAG} + sleep ${RECOVER_TIME} # Need a way to test if broker has started here + start_tx_test "A" + echo "Running for ${RUN_TIME} secs..." | tee -a ${LOG_FILE} + sleep ${RUN_TIME} + kill_process ${SIG_KILL} ${QPIDD_PID} + sleep 2 + analyze_store "A" + tar -czf ${RESULT_DIR}/qls_A.tar.gz ${RELATIVE_BASE_DIR}/qls + + # === PART B: Recovery and check === + start_broker "B" + echo "Recover time=${RECOVER_TIME} secs..." | tee -a ${LOG_FILE} + sleep ${RECOVER_TIME} # Need a way to test if broker has started here + start_tx_test "B" + sleep 1 + kill_process ${SIG_TERM} ${QPIDD_PID} + sleep 2 + PID=`pgrep ${QPIDD_FN}` + if [[ "$?" == "0" ]]; then + kill_process ${SIG_KILL} ${PID} + sleep 2 + fi + analyze_store "B" + tar -czf ${RESULT_DIR}/qls_B.tar.gz ${RELATIVE_BASE_DIR}/qls + + # === Check for errors, cores and exceptions in logs === + grep -Hn "jexception" ${RESULT_DIR}/qpidd.A.log | tee -a ${LOG_FILE} + grep -Hn "jexception" ${RESULT_DIR}/qpidd.B.log | tee -a ${LOG_FILE} + grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.A.log | tee -a ${LOG_FILE} + grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.B.log | tee -a ${LOG_FILE} + grep "${SUCCESS_MSG}" ${RESULT_DIR}/txtest.B.log &> /dev/null + if [[ "$?" != "0" ]]; then + echo "ERROR in run ${rn}" >> ${LOG_FILE} + echo -e "${ANSI_RED}ERROR${ANSI_NONE} in run ${rn}" + ERROR_FLAG=true + else + unset ERROR_FLAG + fi + sleep 2 + process_core_files + echo | tee -a ${LOG_FILE} +done + |