path: root/qpid/cpp/src/tests/linearstore
diff options
Diffstat (limited to 'qpid/cpp/src/tests/linearstore')
9 files changed, 1122 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/linearstore/CMakeLists.txt b/qpid/cpp/src/tests/linearstore/CMakeLists.txt
new file mode 100644
index 0000000000..bf6c164818
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/CMakeLists.txt
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+message(STATUS "Building linearstore tests")
+set(test_wrap ${shell} ${CMAKE_SOURCE_DIR}/src/tests/run_test${test_script_suffix} -buildDir=${CMAKE_BINARY_DIR})
+add_test (linearstore_python_tests ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/run_python_tests${test_script_suffix})
diff --git a/qpid/cpp/src/tests/linearstore/ b/qpid/cpp/src/tests/linearstore/
new file mode 100755
index 0000000000..ef39767e9b
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/
@@ -0,0 +1,55 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# This script sets up a test directory which contains both
+# recoverable and non-recoverable files and directories for
+# the empty file pool (EFP).
+# NOTE: The following is based on typical development tree paths, not installed paths
+export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py
+# Remove old dirs (if present)
+rm -rf ${STORE_DIR}/qls
+rm -rf ${STORE_DIR}/p002
+rm ${STORE_DIR}/p004
+# Create new dir tree and links
+mkdir ${STORE_DIR}/p002_ext
+touch ${STORE_DIR}/p004_ext
+mkdir ${STORE_DIR}/qls
+mkdir ${STORE_DIR}/qls/p001
+touch ${STORE_DIR}/qls/p003
+ln -s ${STORE_DIR}/p002_ext ${STORE_DIR}/qls/p002
+ln -s ${STORE_DIR}/p004_ext ${STORE_DIR}/qls/p004
+# Populate efp dirs with empty files
+${PYTHON_TOOLS_DIR}/ $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25
+${PYTHON_TOOLS_DIR}/ $STORE_DIR/qls/ -a -p 1 -s 512 -n 25
+${PYTHON_TOOLS_DIR}/ $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25
+# Show the result for information
+${LINEARSTOREDIR}/tools/src/py/linearstore/ $STORE_DIR/qls/ -l
+tree -la $STORE_DIR/qls
diff --git a/qpid/cpp/src/tests/linearstore/python_tests/ b/qpid/cpp/src/tests/linearstore/python_tests/
new file mode 100644
index 0000000000..1e59d403e4
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/python_tests/
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Do not delete - marks this directory as a python package.
+from client_persistence import *
diff --git a/qpid/cpp/src/tests/linearstore/python_tests/ b/qpid/cpp/src/tests/linearstore/python_tests/
new file mode 100644
index 0000000000..9ff9480c4c
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/python_tests/
@@ -0,0 +1,239 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from brokertest import EXPECT_EXIT_OK
+from store_test import StoreTest, Qmf, store_args
+from qpid.messaging import *
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client.
+class ExchangeQueueTests(StoreTest):
+ """
+ Simple tests of the broker exchange and queue types
+ """
+ def test_direct_exchange(self):
+ """Test Direct exchange."""
+ broker =, name="test_direct_exchange", expect=EXPECT_EXIT_OK)
+ msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001")
+ msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002")
+ broker.send_message("a", msg1)
+ broker.send_message("b", msg2)
+ broker.terminate()
+ broker =, name="test_direct_exchange")
+ self.check_message(broker, "a", msg1, True)
+ self.check_message(broker, "b", msg2, True)
+ def test_topic_exchange(self):
+ """Test Topic exchange."""
+ broker =, name="test_topic_exchange", expect=EXPECT_EXIT_OK)
+ ssn = broker.connect().session()
+ snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}")
+ snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}")
+ ssn.receiver("a; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}")
+ ssn.receiver("b; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}")
+ ssn.receiver("c; {create:always, link:{x-bindings:[{exchange:abc, key:key1}, "
+ "{exchange:abc, key: key2}]}, node:{durable:True}}")
+ ssn.receiver("d; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}")
+ ssn.receiver("e; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}")
+ msg1 = Message("Message1", durable=True, correlation_id="Msg0003")
+ snd1.send(msg1)
+ msg2 = Message("Message2", durable=True, correlation_id="Msg0004")
+ snd2.send(msg2)
+ broker.terminate()
+ broker =, name="test_topic_exchange")
+ self.check_message(broker, "a", msg1, True)
+ self.check_message(broker, "b", msg1, True)
+ self.check_messages(broker, "c", [msg1, msg2], True)
+ self.check_message(broker, "d", msg2, True)
+ self.check_message(broker, "e", msg2, True)
+ def test_legacy_lvq(self):
+ """Test legacy LVQ."""
+ broker =, name="test_lvq", expect=EXPECT_EXIT_OK)
+ ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"})
+ ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"})
+ mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"})
+ mb2 = Message("B2", durable=True, correlation_id="Msg0008", properties={"qpid.LVQ_key":"B"})
+ mb3 = Message("B3", durable=True, correlation_id="Msg0009", properties={"qpid.LVQ_key":"B"})
+ mc1 = Message("C1", durable=True, correlation_id="Msg0010", properties={"qpid.LVQ_key":"C"})
+ broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
+ xprops="arguments:{\"qpid.last_value_queue\":True}")
+ broker.terminate()
+ broker =, name="test_lvq", expect=EXPECT_EXIT_OK)
+ ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
+ # Add more messages while subscriber is active (no replacement):
+ ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"})
+ ma4 = Message("A4", durable=True, correlation_id="Msg0012", properties={"qpid.LVQ_key":"A"})
+ mc2 = Message("C2", durable=True, correlation_id="Msg0013", properties={"qpid.LVQ_key":"C"})
+ mc3 = Message("C3", durable=True, correlation_id="Msg0014", properties={"qpid.LVQ_key":"C"})
+ mc4 = Message("C4", durable=True, correlation_id="Msg0015", properties={"qpid.LVQ_key":"C"})
+ broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn)
+ ssn.acknowledge()
+ broker.terminate()
+ broker =, name="test_lvq")
+ self.check_messages(broker, "lvq-test", [ma4, mc4], True)
+ def test_fanout_exchange(self):
+ """Test Fanout Exchange"""
+ broker =, name="test_fanout_exchange", expect=EXPECT_EXIT_OK)
+ ssn = broker.connect().session()
+ snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}")
+ msg1 = Message("Msg1", durable=True, correlation_id="Msg0001")
+ snd.send(msg1)
+ msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
+ snd.send(msg2)
+ broker.terminate()
+ broker =, name="test_fanout_exchange")
+ self.check_messages(broker, "q1", [msg1, msg2], True)
+ self.check_messages(broker, "q2", [msg1, msg2], True)
+ self.check_messages(broker, "q3", [msg1, msg2], True)
+ def test_message_reject(self):
+ broker =, name="test_message_reject", expect=EXPECT_EXIT_OK)
+ ssn = broker.connect().session()
+ snd = ssn.sender("tmr; {create:always, node:{type:queue, durable:True}}")
+ rcv = ssn.receiver("tmr; {create:always, node:{type:queue, durable:True}}")
+ m1 = Message("test_message_reject", durable=True, correlation_id="Msg0001")
+ snd.send(m1)
+ m2 = rcv.fetch()
+ ssn.acknowledge(message=m2, disposition=Disposition(REJECTED))
+ broker.terminate()
+ broker =, name="test_message_reject")
+ qmf = Qmf(broker)
+ assert qmf.queue_message_count("tmr") == 0
+ def test_route(self):
+ """ Test the recovery of a route (link and bridge objects."""
+ broker =, name="test_route", expect=EXPECT_EXIT_OK)
+ qmf = Qmf(broker)
+ qmf_broker_obj = qmf.get_objects("broker")[0]
+ # create a "link"
+ link_args = {"host":"", "port":9999, "durable":True,
+ "authMechanism":"PLAIN", "username":"guest", "password":"guest",
+ "transport":"tcp"}
+ result = qmf_broker_obj.create("link", "test-link", link_args, False)
+ self.assertEqual(result.status, 0, result)
+ link = qmf.get_objects("link")[0]
+ # create bridge
+ bridge_args = {"link":"test-link", "src":"", "dest":"amq.fanout",
+ "key":"my-key", "durable":True}
+ result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False);
+ self.assertEqual(result.status, 0, result)
+ bridge = qmf.get_objects("bridge")[0]
+ broker.terminate()
+ # recover the link and bridge
+ broker =, name="test_route")
+ qmf = Qmf(broker)
+ qmf_broker_obj = qmf.get_objects("broker")[0]
+ self.assertEqual(len(qmf.get_objects("link")), 1)
+ self.assertEqual(len(qmf.get_objects("bridge")), 1)
+class AlternateExchangePropertyTests(StoreTest):
+ """
+ Test the persistence of the Alternate Exchange property for exchanges and queues.
+ """
+ def test_exchange(self):
+ """Exchange alternate exchange property persistence test"""
+ broker =, name="test_exchange", expect=EXPECT_EXIT_OK)
+ qmf = Qmf(broker)
+ qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
+ qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch")
+ qmf.close()
+ broker.terminate()
+ broker =, name="test_exchange")
+ qmf = Qmf(broker)
+ try:
+ qmf.add_exchange("altExch", "direct", passive=True)
+ except Exception, error:
+"Alternate exchange (\"altExch\") instance not recovered: %s" % error)
+ try:
+ qmf.add_exchange("testExch", "direct", passive=True)
+ except Exception, error:
+"Test exchange (\"testExch\") instance not recovered: %s" % error)
+ self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"),
+ "Alternate exchange property not found or is incorrect on exchange \"testExch\".")
+ qmf.close()
+ def test_queue(self):
+ """Queue alternate exchange property persistexchangeNamece test"""
+ broker =, name="test_queue", expect=EXPECT_EXIT_OK)
+ qmf = Qmf(broker)
+ qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
+ qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch")
+ qmf.close()
+ broker.terminate()
+ broker =, name="test_queue")
+ qmf = Qmf(broker)
+ try:
+ qmf.add_exchange("altExch", "direct", passive=True)
+ except Exception, error:
+"Alternate exchange (\"altExch\") instance not recovered: %s" % error)
+ try:
+ qmf.add_queue("testQueue", passive=True)
+ except Exception, error:
+"Test queue (\"testQueue\") instance not recovered: %s" % error)
+ self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = "altExch"),
+ "Alternate exchange property not found or is incorrect on queue \"testQueue\".")
+ qmf.close()
+class RedeliveredTests(StoreTest):
+ """
+ Test the behavior of the redelivered flag in the context of persistence
+ """
+ def test_broker_recovery(self):
+ """Test that the redelivered flag is set on messages after recovery of broker"""
+ broker =, name="test_broker_recovery", expect=EXPECT_EXIT_OK)
+ msg_content = "xyz"*100
+ msg = Message(msg_content, durable=True)
+ broker.send_message("testQueue", msg)
+ broker.terminate()
+ broker =, name="test_broker_recovery")
+ rcv_msg = broker.get_message("testQueue")
+ self.assertEqual(msg_content, rcv_msg.content)
+ self.assertTrue(rcv_msg.redelivered)
diff --git a/qpid/cpp/src/tests/linearstore/python_tests/ b/qpid/cpp/src/tests/linearstore/python_tests/
new file mode 100644
index 0000000000..cc846aefd4
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/python_tests/
@@ -0,0 +1,417 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import re
+from brokertest import BrokerTest
+from qpid.messaging import Empty
+from qmf.console import Session
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client.
+def store_args(store_dir = None):
+ """Return the broker args necessary to load the async store"""
+ assert BrokerTest.store_lib
+ if store_dir == None:
+ return []
+ return ["--store-dir", store_dir]
+class Qmf:
+ """
+ QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available.
+ """
+ def __init__(self, broker):
+ self.__session = Session()
+ self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port())
+ def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None, passive=False, durable=False,
+ arguments = None):
+ """Add a new exchange"""
+ amqp_session = self.__broker.getAmqpSession()
+ if arguments == None:
+ arguments = {}
+ if alt_exchange_name:
+ amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
+ alternate_exchange=alt_exchange_name, passive=passive, durable=durable,
+ arguments=arguments)
+ else:
+ amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable,
+ arguments=arguments)
+ def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None):
+ """Add a new queue"""
+ amqp_session = self.__broker.getAmqpSession()
+ if arguments == None:
+ arguments = {}
+ if alt_exchange_name:
+ amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name, passive=passive,
+ durable=durable, arguments=arguments)
+ else:
+ amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments)
+ def delete_queue(self, queue_name):
+ """Delete an existing queue"""
+ amqp_session = self.__broker.getAmqpSession()
+ amqp_session.queue_delete(queue_name)
+ def _query(self, name, _class, package, alt_exchange_name=None):
+ """Qmf query function which can optionally look for the presence of an alternate exchange name"""
+ try:
+ obj_list = self.__session.getObjects(_class=_class, _package=package)
+ found = False
+ for obj in obj_list:
+ if == name:
+ found = True
+ if alt_exchange_name != None:
+ alt_exch_list = self.__session.getObjects(_objectId=obj.altExchange)
+ if len(alt_exch_list) == 0 or alt_exch_list[0].name != alt_exchange_name:
+ return False
+ break
+ return found
+ except Exception:
+ return False
+ def query_exchange(self, exchange_name, alt_exchange_name=None):
+ """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
+ value."""
+ return self._query(exchange_name, "exchange", "", alt_exchange_name)
+ def query_queue(self, queue_name, alt_exchange_name=None):
+ """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
+ value."""
+ return self._query(queue_name, "queue", "", alt_exchange_name)
+ def queue_message_count(self, queue_name):
+ """Query the number of messages on a queue"""
+ queue_list = self.__session.getObjects(_class="queue", _name=queue_name)
+ if len(queue_list):
+ return queue_list[0].msgDepth
+ def queue_empty(self, queue_name):
+ """Check if a queue is empty (has no messages waiting)"""
+ return self.queue_message_count(queue_name) == 0
+ def get_objects(self, target_class, target_package=""):
+ return self.__session.getObjects(_class=target_class, _package=target_package)
+ def close(self):
+ self.__session.delBroker(self.__broker)
+ self.__session = None
+class StoreTest(BrokerTest):
+ """
+ This subclass of BrokerTest adds some convenience test/check functions
+ """
+ def _chk_empty(self, queue, receiver):
+ """Check if a queue is empty (has no more messages)"""
+ try:
+ msg = receiver.fetch(timeout=0)
+ self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg))
+ except Empty:
+ pass
+ @staticmethod
+ def make_message(msg_count, msg_size):
+ """Make message content. Format: 'abcdef....' followed by 'msg-NNNN', where NNNN is the message count"""
+ msg = "msg-%04d" % msg_count
+ msg_len = len(msg)
+ buff = ""
+ if msg_size != None and msg_size > msg_len:
+ for index in range(0, msg_size - msg_len):
+ if index == msg_size - msg_len - 1:
+ buff += "-"
+ else:
+ buff += chr(ord('a') + (index % 26))
+ return buff + msg
+ # Functions for formatting address strings
+ @staticmethod
+ def _fmt_csv(string_list, list_braces = None):
+ """Format a list using comma-separation. Braces are optionally added."""
+ if len(string_list) == 0:
+ return ""
+ first = True
+ str_ = ""
+ if list_braces != None:
+ str_ += list_braces[0]
+ for string in string_list:
+ if string != None:
+ if first:
+ first = False
+ else:
+ str_ += ", "
+ str_ += string
+ if list_braces != None:
+ str_ += list_braces[1]
+ return str_
+ def _fmt_map(self, string_list):
+ """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map
+ element('key:val')."""
+ return self._fmt_csv(string_list, list_braces="{}")
+ def _fmt_list(self, string_list):
+ """Format a list [l1, l2, l3, ...] from a string list."""
+ return self._fmt_csv(string_list, list_braces="[]")
+ def addr_fmt(self, node_name, **kwargs):
+ """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address
+ string."""
+ # Get keyword args
+ node_subject = kwargs.get("node_subject")
+ create_policy = kwargs.get("create_policy")
+ delete_policy = kwargs.get("delete_policy")
+ assert_policy = kwargs.get("assert_policy")
+ mode = kwargs.get("mode")
+ link = kwargs.get("link", False)
+ link_name = kwargs.get("link_name")
+ node_type = kwargs.get("node_type")
+ durable = kwargs.get("durable", False)
+ link_reliability = kwargs.get("link_reliability")
+ x_declare_list = kwargs.get("x_declare_list", [])
+ x_bindings_list = kwargs.get("x_bindings_list", [])
+ x_subscribe_list = kwargs.get("x_subscribe_list", [])
+ node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0)
+ link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or
+ len(x_bindings_list) > 0 or len(x_subscribe_list) > 0)
+ assert not (node_flag and link_flag)
+ opt_str_list = []
+ if create_policy != None:
+ opt_str_list.append("create: %s" % create_policy)
+ if delete_policy != None:
+ opt_str_list.append("delete: %s" % delete_policy)
+ if assert_policy != None:
+ opt_str_list.append("assert: %s" % assert_policy)
+ if mode != None:
+ opt_str_list.append("mode: %s" % mode)
+ if node_flag or link_flag:
+ node_str_list = []
+ if link_name != None:
+ node_str_list.append("name: \"%s\"" % link_name)
+ if node_type != None:
+ node_str_list.append("type: %s" % node_type)
+ if durable:
+ node_str_list.append("durable: True")
+ if link_reliability != None:
+ node_str_list.append("reliability: %s" % link_reliability)
+ if len(x_declare_list) > 0:
+ node_str_list.append("x-declare: %s" % self._fmt_map(x_declare_list))
+ if len(x_bindings_list) > 0:
+ node_str_list.append("x-bindings: %s" % self._fmt_list(x_bindings_list))
+ if len(x_subscribe_list) > 0:
+ node_str_list.append("x-subscribe: %s" % self._fmt_map(x_subscribe_list))
+ if node_flag:
+ opt_str_list.append("node: %s" % self._fmt_map(node_str_list))
+ else:
+ opt_str_list.append("link: %s" % self._fmt_map(node_str_list))
+ addr_str = node_name
+ if node_subject != None:
+ addr_str += "/%s" % node_subject
+ if len(opt_str_list) > 0:
+ addr_str += "; %s" % self._fmt_map(opt_str_list)
+ return addr_str
+ def snd_addr(self, node_name, **kwargs):
+ """ Create a send (node) address"""
+ # Get keyword args
+ topic = kwargs.get("topic")
+ topic_flag = kwargs.get("topic_flag", False)
+ auto_create = kwargs.get("auto_create", True)
+ auto_delete = kwargs.get("auto_delete", False)
+ durable = kwargs.get("durable", False)
+ exclusive = kwargs.get("exclusive", False)
+ ftd_count = kwargs.get("ftd_count")
+ ftd_size = kwargs.get("ftd_size")
+ policy = kwargs.get("policy", "flow-to-disk")
+ exchage_type = kwargs.get("exchage_type")
+ create_policy = None
+ if auto_create:
+ create_policy = "always"
+ delete_policy = None
+ if auto_delete:
+ delete_policy = "always"
+ node_type = None
+ if topic != None or topic_flag:
+ node_type = "topic"
+ x_declare_list = ["\"exclusive\": %s" % exclusive]
+ if ftd_count != None or ftd_size != None:
+ queue_policy = ["\'qpid.policy_type\': %s" % policy]
+ if ftd_count:
+ queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+ if ftd_size:
+ queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+ x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
+ if exchage_type != None:
+ x_declare_list.append("type: %s" % exchage_type)
+ return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy,
+ node_type=node_type, durable=durable, x_declare_list=x_declare_list)
+ def rcv_addr(self, node_name, **kwargs):
+ """ Create a receive (link) address"""
+ # Get keyword args
+ auto_create = kwargs.get("auto_create", True)
+ auto_delete = kwargs.get("auto_delete", False)
+ link_name = kwargs.get("link_name")
+ durable = kwargs.get("durable", False)
+ browse = kwargs.get("browse", False)
+ exclusive = kwargs.get("exclusive", False)
+ binding_list = kwargs.get("binding_list", [])
+ ftd_count = kwargs.get("ftd_count")
+ ftd_size = kwargs.get("ftd_size")
+ policy = kwargs.get("policy", "flow-to-disk")
+ create_policy = None
+ if auto_create:
+ create_policy = "always"
+ delete_policy = None
+ if auto_delete:
+ delete_policy = "always"
+ mode = None
+ if browse:
+ mode = "browse"
+ x_declare_list = ["\"exclusive\": %s" % exclusive]
+ if ftd_count != None or ftd_size != None:
+ queue_policy = ["\'qpid.policy_type\': %s" % policy]
+ if ftd_count:
+ queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+ if ftd_size:
+ queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+ x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
+ x_bindings_list = []
+ for binding in binding_list:
+ x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+ if durable: reliability = 'at-least-once'
+ else: reliability = None
+ return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
+ link_name=link_name, durable=durable, x_declare_list=x_declare_list,
+ x_bindings_list=x_bindings_list, link_reliability=reliability)
+ def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
+ """Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
+ return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse)
+ def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False,
+ emtpy_flag=False):
+ """Check that messages is on a queue by dequeuing them and comparing them to the expected messages"""
+ if emtpy_flag:
+ num_msgs = 0
+ else:
+ num_msgs = len(exp_msg_list)
+ ssn = broker.connect().session(transactional=transactional)
+ rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs)
+ if num_msgs > 0:
+ try:
+ recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)]
+ except Empty:
+ self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i))
+ for i in range(0, len(recieved_msg_list)):
+ self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content)
+ self.assertEqual(recieved_msg_list[i].correlation_id, exp_msg_list[i].correlation_id)
+ if empty:
+ self._chk_empty(queue, rcvr)
+ if ack:
+ ssn.acknowledge()
+ if transactional:
+ ssn.commit()
+ ssn.connection.close()
+ else:
+ if transactional:
+ ssn.commit()
+ return ssn
+ # Functions for finding strings in the broker log file (or other files)
+ @staticmethod
+ def _read_file(file_name):
+ """Returns the content of file named file_name as a string"""
+ file_handle = file(file_name)
+ try:
+ return
+ finally:
+ file_handle.close()
+ def _get_hits(self, broker, search):
+ """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple
+ queues)"""
+ # TODO: Use sets when RHEL-4 is no longer supported
+ hits = []
+ for hit in search.findall(self._read_file(broker.log)):
+ if hit not in hits:
+ hits.append(hit)
+ return hits
+ def _reconsile_hits(self, broker, ftd_msgs, release_hits):
+ """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining
+ release_hits."""
+ for msg in ftd_msgs:
+ found = False
+ for hit in release_hits:
+ if str( in hit:
+ release_hits.remove(hit)
+ #print "Found %s in %s" % (, broker.log)
+ found = True
+ break
+ if not found:
+ self.assert_(False, "Unable to locate released message %s in log %s" % (, broker.log))
+ if len(release_hits) > 0:
+ err = "Messages were unexpectedly released in log %s:\n" % broker.log
+ for hit in release_hits:
+ err += " %s\n" % hit
+ self.assert_(False, err)
+ def check_msg_release(self, broker, ftd_msgs):
+ """ Check for 'Content released' messages in broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+ def check_msg_release_on_commit(self, broker, ftd_msgs):
+ """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released on commit$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+ def check_msg_release_on_recover(self, broker, ftd_msgs):
+ """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released after recovery$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+ def check_msg_block(self, broker, ftd_msgs):
+ """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content release blocked$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+ def check_msg_block_on_commit(self, broker, ftd_msgs):
+ """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content release blocked on commit$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
diff --git a/qpid/cpp/src/tests/linearstore/run_long_python_tests b/qpid/cpp/src/tests/linearstore/run_long_python_tests
new file mode 100644
index 0000000000..be6380302c
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/run_long_python_tests
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+./run_python_tests LONG_TEST
diff --git a/qpid/cpp/src/tests/linearstore/run_python_tests b/qpid/cpp/src/tests/linearstore/run_python_tests
new file mode 100755
index 0000000000..4ff212a71c
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/run_python_tests
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#Add our directory to the python path
+export PYTHONPATH=$srcdir/linearstore:${PYTHONPATH}
+echo "Running Python tests in module ${MODULENAME}..."
+rm -rf ${OUTDIR}
+# To debug a test, add the following options to the end of the following line:
+# -v DEBUG -c [*.testName]
diff --git a/qpid/cpp/src/tests/linearstore/run_short_python_tests b/qpid/cpp/src/tests/linearstore/run_short_python_tests
new file mode 100644
index 0000000000..9b9e7c59be
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/run_short_python_tests
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+./run_python_tests SHORT_TEST
diff --git a/qpid/cpp/src/tests/linearstore/ b/qpid/cpp/src/tests/linearstore/
new file mode 100755
index 0000000000..7d5581961f
--- /dev/null
+++ b/qpid/cpp/src/tests/linearstore/
@@ -0,0 +1,275 @@
+#! /bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# tx-test-soak
+# Basic test methodology:
+# 1. Start broker
+# 2. Run qpid-txtest against broker using randomly generated parameters
+# 3. After some time, kill the broker using SIGKILL
+# 4. Restart broker, recover messages
+# 5. Run qpid-txtest against broker in check mode, which checks that all expected messages are present.
+# 6. Wash, rinse, repeat... The number of runs is determined by ${NUM_RUNS}
+# NOTE: The following is based on typical development tree paths, not installed paths
+# Infrequently adjusted
+BROKER_MANAGEMENT="no" # "no" or "yes"
+# Constants (don't adjust these)
+export BASE_DIR
+RELATIVE_BASE_DIR=`python -c "import os,os.path; print os.path.relpath(os.environ['BASE_DIR'], os.environ['PWD'])"`
+export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py
+ANALYZE_ARGS="--efp --show-recs --stats"
+QPIDD_BASE_ARGS="--load-module ${STORE_MODULE} -m ${BROKER_MANAGEMENT} --auth no --default-flow-stop-threshold 0 --default-flow-resume-threshold 0 --default-queue-limit 0 --store-dir ${BASE_DIR} --log-enable ${BROKER_LOG_LEVEL} --log-to-stderr no --log-to-stdout no"
+TXTEST_INIT_STR="--init yes --transfer no --check no"
+TXTEST_RUN_STR="--init no --transfer yes --check no"
+TXTEST_CHK_STR="--init no --transfer no --check yes"
+SUCCESS_MSG="All expected messages were retrieved."
+# Creates a random number into the variable named in string $1 in the range [$2..$3] (both inclusive).
+# $1: variable name as string to which random value is assigned
+# $2: minimum inclusive range of random number
+# $3: maximum inclusive range of random number
+get_random() {
+ eval $1=`python -S -c "import random; print random.randint($2,$3)"`
+# Uses anon-uniform distribution to set a random message size.
+# Most messages must be small (0 - 1k), but we need a few medium (10k) and large (100k) ones also.
+# Sets message size into var ${MSG_SIZE}
+set_message_size() {
+ local key=0
+ get_random "key" 1 10
+ if (( "${key}" == "10" )); then # 1 out of 10 - very large
+ get_random "MSG_SIZE" 100000 1000000
+ elif (( "${key}" >= "8" )); then # 2 out of 10 - large
+ get_random "MSG_SIZE" 10000 100000
+ elif (( "${key}" >= "6" )); then # 2 out of 10 - medium
+ get_random "MSG_SIZE" 1000 10000
+ else # 5 out of 10 - small
+ get_random "MSG_SIZE" 10 1000
+ fi
+# Start or restart broker
+# $1: Log suffix: either "A" or "B". If "A", broker is started with truncation, otherwise broker is restarted with recovery.
+# $2: Truncate flag - only used if Log suffix is "A": if true, then truncate store
+# The PID of the broker is returned in ${QPIDD_PID}
+start_broker() {
+ local truncate_val
+ local truncate_str
+ if [[ "$1" == "A" ]]; then
+ if [[ $2 == true ]]; then
+ truncate_val="yes"
+ truncate_str="(Store truncated)"
+ if [[ -e ${BASE_DIR}/qls/p001/efp/${DEFAULT_EFP_DIR} ]]; then
+ for f in ${BASE_DIR}/qls/p001/efp/${DEFAULT_EFP_DIR}/*; do
+ local filesize=`stat -c%s "${f}"`
+ if (( ${filesize} != ${DEFAULT_EFP_SIZE} )); then
+ rm ${f}
+ fi
+ done
+ fi
+ else
+ truncate_val="no"
+ fi
+ else
+ truncate_val="no"
+ fi
+ echo "${QPIDD} ${QPIDD_BASE_ARGS} --truncate ${truncate_val} --log-to-file ${RESULT_DIR}/qpidd.$1.log &" > ${RESULT_DIR}/qpidd.$1.cmd
+ ${QPIDD} ${QPIDD_BASE_ARGS} --truncate ${truncate_val} --log-to-file ${RESULT_DIR}/qpidd.$1.log &
+ echo "Broker PID=${QPIDD_PID} ${truncate_str}" | tee -a ${LOG_FILE}
+# Start or evaluate results of transaction test client
+# $1: Log suffix flag: either "A" or "B". If "A", client is started in test mode, otherwise client evaluates recovery.
+start_tx_test() {
+ local tx_test_params="--messages-per-tx ${MSGS_PER_TX} --tx-count 1000000 --total-messages ${TOT_MSGS} --size ${MSG_SIZE} --queues ${NUM_QUEUES}"
+ if [[ "$1" == "A" ]]; then
+ # Run in background
+ echo "${TXTEST##*/} parameters: ${tx_test_params}" | tee -a ${LOG_FILE}
+ echo "${TXTEST} ${tx_test_params} ${TXTEST_INIT_STR} &> ${RESULT_DIR}/txtest.$1.log" > ${RESULT_DIR}/txtest.$1.cmd
+ ${TXTEST} ${tx_test_params} ${TXTEST_INIT_STR} &> ${RESULT_DIR}/txtest.$1.log
+ echo "${TXTEST} ${tx_test_params} ${TXTEST_RUN_STR} &> ${RESULT_DIR}/txtest.$1.log &" >> ${RESULT_DIR}/txtest.$1.cmd
+ ${TXTEST} ${tx_test_params} ${TXTEST_RUN_STR} &> ${RESULT_DIR}/txtest.$1.log &
+ else
+ # Run in foreground
+ #echo "${TXTEST##*/} ${tx_test_params} ${TXTEST_CHK_STR}" | tee -a ${LOG_FILE}
+ echo "${TXTEST} ${tx_test_params} ${TXTEST_CHK_STR} &> ${RESULT_DIR}/txtest.$1.log" > ${RESULT_DIR}/txtest.$1.cmd
+ ${TXTEST} ${tx_test_params} ${TXTEST_CHK_STR} &> ${RESULT_DIR}/txtest.$1.log
+ fi
+# Search for the presence of core.* files, move them into the current result directory and run gdb against them.
+# No params
+process_core_files() {
+ ls core.* &> /dev/null
+ if (( "$?" == "0" )); then
+ for cf in core.*; do
+ gdb --batch --quiet -ex "thread apply all bt" -ex "quit" ${QPIDD} ${cf} &> ${RESULT_DIR}/${cf##*/}.gdb.txt
+ gdb --batch --quiet -ex "thread apply all bt full" -ex "quit" ${QPIDD} ${cf} &> ${RESULT_DIR}/${cf##*/}.gdb-full.txt
+ cat ${RESULT_DIR}/${cf##*/}.gdb.txt
+ mv ${cf} ${RESULT_DIR}/
+ echo "Core file ${cf##*/} found and recovered"
+ done
+ fi
+# Kill a process quietly
+# $1: Signal
+# $2: PID
+kill_process() {
+ kill ${1} ${2} &>> ${LOG_FILE}
+ wait ${2} &>> ${LOG_FILE}
+# Check that test can run: No other copy of qpidd running, enough disk space
+check_ready_to_run() {
+ # Check no copy of qpidd is running
+ PID=`pgrep ${QPIDD_FN}`
+ if [[ "$?" == "0" ]]; then
+ echo "ERROR: qpidd running as pid ${PID}"
+ exit 1
+ fi
+ # Check disk is < 90% full
+ local perc_full=`df -h ${HOME} | tail -1 | awk '{print substr($5,0, length($5)-1)}'`
+ if (( ${perc_full} >= ${MAX_DISK_PERC_USED} )); then
+ echo "ERROR: Disk is too close to full (${perc_full}%)"
+ exit 2
+ fi
+# Analyze store files
+# $1: Log suffix flag: either "A" or "B". If "A", client is started in test mode, otherwise client evaluates recovery.
+analyze_store() {
+ ${ANALYZE} ${ANALYZE_ARGS} ${BASE_DIR}/qls &> ${RESULT_DIR}/qls_analysis.$1.log
+ echo >> ${RESULT_DIR}/qls_analysis.$1.log
+ echo "----------------------------------------------------------" >> ${RESULT_DIR}/qls_analysis.$1.log
+ echo "With transactional reconsiliation:" >> ${RESULT_DIR}/qls_analysis.$1.log
+ echo >> ${RESULT_DIR}/qls_analysis.$1.log
+ ${ANALYZE} ${ANALYZE_ARGS} --txn ${BASE_DIR}/qls &>> ${RESULT_DIR}/qls_analysis.$1.log
+ulimit -c unlimited # Allow core files to be created
+if [[ -n "${RESULT_BASE_DIR}" ]]; then
+ rm -rf ${RESULT_BASE_DIR}
+mkdir -p ${RESULT_BASE_DIR}
+for rn in `seq ${NUM_RUNS}`; do
+ # === Prepare result dir, check ready to run test, set run vars ===
+ mkdir -p ${RESULT_DIR}
+ check_ready_to_run
+ if (( (${rn} - 1) % ${TRUNCATE_INTERVAL} == 0 )) || [[ -n ${ERROR_FLAG} ]]; then
+ else
+ fi
+ set_message_size
+ get_random "MSGS_PER_TX" 1 20
+ get_random "TOT_MSGS" 100 1000
+ get_random "NUM_QUEUES" 2 15
+ echo "Run ${rn} of ${NUM_RUNS} ==============" | tee -a ${LOG_FILE}
+ # === PART A: Initial run of qpid-txtest ===
+ start_broker "A" ${TRUNCATE_FLAG}
+ sleep ${RECOVER_TIME} # Need a way to test if broker has started here
+ start_tx_test "A"
+ echo "Running for ${RUN_TIME} secs..." | tee -a ${LOG_FILE}
+ sleep ${RUN_TIME}
+ kill_process ${SIG_KILL} ${QPIDD_PID}
+ sleep 2
+ analyze_store "A"
+ tar -czf ${RESULT_DIR}/qls_A.tar.gz ${RELATIVE_BASE_DIR}/qls
+ # === PART B: Recovery and check ===
+ start_broker "B"
+ echo "Recover time=${RECOVER_TIME} secs..." | tee -a ${LOG_FILE}
+ sleep ${RECOVER_TIME} # Need a way to test if broker has started here
+ start_tx_test "B"
+ sleep 1
+ kill_process ${SIG_TERM} ${QPIDD_PID}
+ sleep 2
+ PID=`pgrep ${QPIDD_FN}`
+ if [[ "$?" == "0" ]]; then
+ kill_process ${SIG_KILL} ${PID}
+ sleep 2
+ fi
+ analyze_store "B"
+ tar -czf ${RESULT_DIR}/qls_B.tar.gz ${RELATIVE_BASE_DIR}/qls
+ # === Check for errors, cores and exceptions in logs ===
+ grep -Hn "jexception" ${RESULT_DIR}/qpidd.A.log | tee -a ${LOG_FILE}
+ grep -Hn "jexception" ${RESULT_DIR}/qpidd.B.log | tee -a ${LOG_FILE}
+ grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.A.log | tee -a ${LOG_FILE}
+ grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.B.log | tee -a ${LOG_FILE}
+ grep "${SUCCESS_MSG}" ${RESULT_DIR}/txtest.B.log &> /dev/null
+ if [[ "$?" != "0" ]]; then
+ echo "ERROR in run ${rn}" >> ${LOG_FILE}
+ echo -e "${ANSI_RED}ERROR${ANSI_NONE} in run ${rn}"
+ else
+ unset ERROR_FLAG
+ fi
+ sleep 2
+ process_core_files
+ echo | tee -a ${LOG_FILE}