diff options
author | Kim van der Riet <kpvdr@apache.org> | 2014-09-16 13:47:02 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2014-09-16 13:47:02 +0000 |
commit | 58445eab763b395dca457de8e49d1f425a77daa5 (patch) | |
tree | db7c68d0f6a67f5020dd63d71a6155f54cb3e56b | |
parent | 03c8f2a4422f3c33f3a9e09ed9ff93e87fb29da1 (diff) | |
download | qpid-python-58445eab763b395dca457de8e49d1f425a77daa5.tar.gz |
QPID-5361: No tests for linearstore functionality currently exist - basic tests from the legacystore suite ported to linearstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1625283 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | cpp/src/qpid/linearstore/ISSUES | 5 | ||||
-rw-r--r-- | cpp/src/tests/linearstore/CMakeLists.txt | 29 | ||||
-rw-r--r-- | cpp/src/tests/linearstore/python_tests/__init__.py | 23 | ||||
-rw-r--r-- | cpp/src/tests/linearstore/python_tests/client_persistence.py | 239 | ||||
-rw-r--r-- | cpp/src/tests/linearstore/python_tests/store_test.py | 417 | ||||
-rw-r--r-- | cpp/src/tests/linearstore/run_long_python_tests | 21 | ||||
-rwxr-xr-x | cpp/src/tests/linearstore/run_python_tests | 42 | ||||
-rw-r--r-- | cpp/src/tests/linearstore/run_short_python_tests | 21 |
9 files changed, 797 insertions, 1 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 568cec84b8..3031951ed3 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1280,6 +1280,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake add_subdirectory(qpid/store) add_subdirectory(tests) add_subdirectory(tests/legacystore) +add_subdirectory(tests/linearstore) # Support for pkg-config diff --git a/cpp/src/qpid/linearstore/ISSUES b/cpp/src/qpid/linearstore/ISSUES index 2866e6c58d..da169f57d6 100644 --- a/cpp/src/qpid/linearstore/ISSUES +++ b/cpp/src/qpid/linearstore/ISSUES @@ -132,6 +132,8 @@ NO-JIRA - Added missing Apache copyright/license text 5948 1121660 [AMQP 1.0] Broker restart failure with durable topic using non-durable exchange svn r.1616287 2014-08-06 Proposed solution checked in by gsim This turned out to be an AMQP error, fix does not affect store code. + 6043 1089652 [RFE]: Configuration option for linear store to delete or overwrite the used journal files. + svn r.1620426 2014-08-25 Proposed solution Ordered checkin list: @@ -168,10 +170,11 @@ no. svn r Q-JIRA RHBZ Date Alt Committer 26. 1584379 5661 - 2014-04-03 27. 1594215 5750 1078142 2014-05-13 0.22-mrg 28. 1596509 5767 1098118 2014-05-21 0.22-mrg (pmoravec) -29. 1596633 NO-JIRA 1078937 2014-05-21 (includes tools install update) +29. 1596633 NO-JIRA 1078937 2014-05-21 (includes tools install update) 30. 1599243 5767 1098118 2014-06-02 0.22-mrg 30. 1599243 5767 1098118 2014-06-02 31. 1614665 5924 1124906 2014-07-30 +32. 1620426 6043 1089652 2014-08-25 See above sections for details on these checkins. diff --git a/cpp/src/tests/linearstore/CMakeLists.txt b/cpp/src/tests/linearstore/CMakeLists.txt new file mode 100644 index 0000000000..996a8c63c1 --- /dev/null +++ b/cpp/src/tests/linearstore/CMakeLists.txt @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if(BUILD_LINEARSTORE AND BUILD_TESTING) + +message(STATUS "Building linearstore tests") + +set(test_wrap ${shell} ${CMAKE_SOURCE_DIR}/src/tests/run_test${test_script_suffix} --build-dir=${CMAKE_BINARY_DIR}) + +add_test (linearstore_python_tests ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/run_python_tests${test_script_suffix}) + +endif (BUILD_LINEARSTORE AND BUILD_TESTING) + diff --git a/cpp/src/tests/linearstore/python_tests/__init__.py b/cpp/src/tests/linearstore/python_tests/__init__.py new file mode 100644 index 0000000000..1e59d403e4 --- /dev/null +++ b/cpp/src/tests/linearstore/python_tests/__init__.py @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Do not delete - marks this directory as a python package. + +from client_persistence import * + diff --git a/cpp/src/tests/linearstore/python_tests/client_persistence.py b/cpp/src/tests/linearstore/python_tests/client_persistence.py new file mode 100644 index 0000000000..9ff9480c4c --- /dev/null +++ b/cpp/src/tests/linearstore/python_tests/client_persistence.py @@ -0,0 +1,239 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os + +from brokertest import EXPECT_EXIT_OK +from store_test import StoreTest, Qmf, store_args +from qpid.messaging import * + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client. + +class ExchangeQueueTests(StoreTest): + """ + Simple tests of the broker exchange and queue types + """ + + def test_direct_exchange(self): + """Test Direct exchange.""" + broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK) + msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001") + msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002") + broker.send_message("a", msg1) + broker.send_message("b", msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_direct_exchange") + self.check_message(broker, "a", msg1, True) + self.check_message(broker, "b", msg2, True) + + def test_topic_exchange(self): + """Test Topic exchange.""" + broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}") + snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}") + ssn.receiver("a; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}") + ssn.receiver("b; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}") + ssn.receiver("c; {create:always, link:{x-bindings:[{exchange:abc, key:key1}, " + "{exchange:abc, key: key2}]}, node:{durable:True}}") + ssn.receiver("d; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}") + ssn.receiver("e; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}") + msg1 = Message("Message1", durable=True, correlation_id="Msg0003") + snd1.send(msg1) + msg2 = Message("Message2", durable=True, correlation_id="Msg0004") + snd2.send(msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_topic_exchange") + self.check_message(broker, "a", msg1, True) + self.check_message(broker, "b", msg1, True) + self.check_messages(broker, "c", [msg1, msg2], True) + self.check_message(broker, "d", msg2, True) + self.check_message(broker, "e", msg2, True) + + + def test_legacy_lvq(self): + """Test legacy LVQ.""" + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) + ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"}) + ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"}) + mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"}) + mb2 = Message("B2", durable=True, correlation_id="Msg0008", properties={"qpid.LVQ_key":"B"}) + mb3 = Message("B3", durable=True, correlation_id="Msg0009", properties={"qpid.LVQ_key":"B"}) + mc1 = Message("C1", durable=True, correlation_id="Msg0010", properties={"qpid.LVQ_key":"C"}) + broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1], + xprops="arguments:{\"qpid.last_value_queue\":True}") + broker.terminate() + + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) + ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False) + # Add more messages while subscriber is active (no replacement): + ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"}) + ma4 = Message("A4", durable=True, correlation_id="Msg0012", properties={"qpid.LVQ_key":"A"}) + mc2 = Message("C2", durable=True, correlation_id="Msg0013", properties={"qpid.LVQ_key":"C"}) + mc3 = Message("C3", durable=True, correlation_id="Msg0014", properties={"qpid.LVQ_key":"C"}) + mc4 = Message("C4", durable=True, correlation_id="Msg0015", properties={"qpid.LVQ_key":"C"}) + broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn) + ssn.acknowledge() + broker.terminate() + + broker = self.broker(store_args(), name="test_lvq") + self.check_messages(broker, "lvq-test", [ma4, mc4], True) + + + def test_fanout_exchange(self): + """Test Fanout Exchange""" + broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}") + msg1 = Message("Msg1", durable=True, correlation_id="Msg0001") + snd.send(msg1) + msg2 = Message("Msg2", durable=True, correlation_id="Msg0002") + snd.send(msg2) + broker.terminate() + + broker = self.broker(store_args(), name="test_fanout_exchange") + self.check_messages(broker, "q1", [msg1, msg2], True) + self.check_messages(broker, "q2", [msg1, msg2], True) + self.check_messages(broker, "q3", [msg1, msg2], True) + + + def test_message_reject(self): + broker = self.broker(store_args(), name="test_message_reject", expect=EXPECT_EXIT_OK) + ssn = broker.connect().session() + snd = ssn.sender("tmr; {create:always, node:{type:queue, durable:True}}") + rcv = ssn.receiver("tmr; {create:always, node:{type:queue, durable:True}}") + m1 = Message("test_message_reject", durable=True, correlation_id="Msg0001") + snd.send(m1) + m2 = rcv.fetch() + ssn.acknowledge(message=m2, disposition=Disposition(REJECTED)) + broker.terminate() + + broker = self.broker(store_args(), name="test_message_reject") + qmf = Qmf(broker) + assert qmf.queue_message_count("tmr") == 0 + + + def test_route(self): + """ Test the recovery of a route (link and bridge objects.""" + broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf_broker_obj = qmf.get_objects("broker")[0] + + # create a "link" + link_args = {"host":"a.fake.host.com", "port":9999, "durable":True, + "authMechanism":"PLAIN", "username":"guest", "password":"guest", + "transport":"tcp"} + result = qmf_broker_obj.create("link", "test-link", link_args, False) + self.assertEqual(result.status, 0, result) + link = qmf.get_objects("link")[0] + + # create bridge + bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key", "durable":True} + result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False); + self.assertEqual(result.status, 0, result) + bridge = qmf.get_objects("bridge")[0] + + broker.terminate() + + # recover the link and bridge + broker = self.broker(store_args(), name="test_route") + qmf = Qmf(broker) + qmf_broker_obj = qmf.get_objects("broker")[0] + self.assertEqual(len(qmf.get_objects("link")), 1) + self.assertEqual(len(qmf.get_objects("bridge")), 1) + + + +class AlternateExchangePropertyTests(StoreTest): + """ + Test the persistence of the Alternate Exchange property for exchanges and queues. + """ + + def test_exchange(self): + """Exchange alternate exchange property persistence test""" + broker = self.broker(store_args(), name="test_exchange", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance + qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch") + qmf.close() + broker.terminate() + + broker = self.broker(store_args(), name="test_exchange") + qmf = Qmf(broker) + try: + qmf.add_exchange("altExch", "direct", passive=True) + except Exception, error: + self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error) + try: + qmf.add_exchange("testExch", "direct", passive=True) + except Exception, error: + self.fail("Test exchange (\"testExch\") instance not recovered: %s" % error) + self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"), + "Alternate exchange property not found or is incorrect on exchange \"testExch\".") + qmf.close() + + def test_queue(self): + """Queue alternate exchange property persistexchangeNamece test""" + broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK) + qmf = Qmf(broker) + qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance + qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch") + qmf.close() + broker.terminate() + + broker = self.broker(store_args(), name="test_queue") + qmf = Qmf(broker) + try: + qmf.add_exchange("altExch", "direct", passive=True) + except Exception, error: + self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error) + try: + qmf.add_queue("testQueue", passive=True) + except Exception, error: + self.fail("Test queue (\"testQueue\") instance not recovered: %s" % error) + self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = "altExch"), + "Alternate exchange property not found or is incorrect on queue \"testQueue\".") + qmf.close() + + +class RedeliveredTests(StoreTest): + """ + Test the behavior of the redelivered flag in the context of persistence + """ + + def test_broker_recovery(self): + """Test that the redelivered flag is set on messages after recovery of broker""" + broker = self.broker(store_args(), name="test_broker_recovery", expect=EXPECT_EXIT_OK) + msg_content = "xyz"*100 + msg = Message(msg_content, durable=True) + broker.send_message("testQueue", msg) + broker.terminate() + + broker = self.broker(store_args(), name="test_broker_recovery") + rcv_msg = broker.get_message("testQueue") + self.assertEqual(msg_content, rcv_msg.content) + self.assertTrue(rcv_msg.redelivered) + diff --git a/cpp/src/tests/linearstore/python_tests/store_test.py b/cpp/src/tests/linearstore/python_tests/store_test.py new file mode 100644 index 0000000000..cc846aefd4 --- /dev/null +++ b/cpp/src/tests/linearstore/python_tests/store_test.py @@ -0,0 +1,417 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import re +from brokertest import BrokerTest +from qpid.messaging import Empty +from qmf.console import Session + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + + +def store_args(store_dir = None): + """Return the broker args necessary to load the async store""" + assert BrokerTest.store_lib + if store_dir == None: + return [] + return ["--store-dir", store_dir] + +class Qmf: + """ + QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available. + """ + def __init__(self, broker): + self.__session = Session() + self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port()) + + def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None, passive=False, durable=False, + arguments = None): + """Add a new exchange""" + amqp_session = self.__broker.getAmqpSession() + if arguments == None: + arguments = {} + if alt_exchange_name: + amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, + alternate_exchange=alt_exchange_name, passive=passive, durable=durable, + arguments=arguments) + else: + amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable, + arguments=arguments) + + def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None): + """Add a new queue""" + amqp_session = self.__broker.getAmqpSession() + if arguments == None: + arguments = {} + if alt_exchange_name: + amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name, passive=passive, + durable=durable, arguments=arguments) + else: + amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments) + + def delete_queue(self, queue_name): + """Delete an existing queue""" + amqp_session = self.__broker.getAmqpSession() + amqp_session.queue_delete(queue_name) + + def _query(self, name, _class, package, alt_exchange_name=None): + """Qmf query function which can optionally look for the presence of an alternate exchange name""" + try: + obj_list = self.__session.getObjects(_class=_class, _package=package) + found = False + for obj in obj_list: + if obj.name == name: + found = True + if alt_exchange_name != None: + alt_exch_list = self.__session.getObjects(_objectId=obj.altExchange) + if len(alt_exch_list) == 0 or alt_exch_list[0].name != alt_exchange_name: + return False + break + return found + except Exception: + return False + + + def query_exchange(self, exchange_name, alt_exchange_name=None): + """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known + value.""" + return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name) + + def query_queue(self, queue_name, alt_exchange_name=None): + """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known + value.""" + return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name) + + def queue_message_count(self, queue_name): + """Query the number of messages on a queue""" + queue_list = self.__session.getObjects(_class="queue", _name=queue_name) + if len(queue_list): + return queue_list[0].msgDepth + + def queue_empty(self, queue_name): + """Check if a queue is empty (has no messages waiting)""" + return self.queue_message_count(queue_name) == 0 + + def get_objects(self, target_class, target_package="org.apache.qpid.broker"): + return self.__session.getObjects(_class=target_class, _package=target_package) + + + def close(self): + self.__session.delBroker(self.__broker) + self.__session = None + + +class StoreTest(BrokerTest): + """ + This subclass of BrokerTest adds some convenience test/check functions + """ + + def _chk_empty(self, queue, receiver): + """Check if a queue is empty (has no more messages)""" + try: + msg = receiver.fetch(timeout=0) + self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg)) + except Empty: + pass + + @staticmethod + def make_message(msg_count, msg_size): + """Make message content. Format: 'abcdef....' followed by 'msg-NNNN', where NNNN is the message count""" + msg = "msg-%04d" % msg_count + msg_len = len(msg) + buff = "" + if msg_size != None and msg_size > msg_len: + for index in range(0, msg_size - msg_len): + if index == msg_size - msg_len - 1: + buff += "-" + else: + buff += chr(ord('a') + (index % 26)) + return buff + msg + + # Functions for formatting address strings + + @staticmethod + def _fmt_csv(string_list, list_braces = None): + """Format a list using comma-separation. Braces are optionally added.""" + if len(string_list) == 0: + return "" + first = True + str_ = "" + if list_braces != None: + str_ += list_braces[0] + for string in string_list: + if string != None: + if first: + first = False + else: + str_ += ", " + str_ += string + if list_braces != None: + str_ += list_braces[1] + return str_ + + def _fmt_map(self, string_list): + """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map + element('key:val').""" + return self._fmt_csv(string_list, list_braces="{}") + + def _fmt_list(self, string_list): + """Format a list [l1, l2, l3, ...] from a string list.""" + return self._fmt_csv(string_list, list_braces="[]") + + def addr_fmt(self, node_name, **kwargs): + """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address + string.""" + # Get keyword args + node_subject = kwargs.get("node_subject") + create_policy = kwargs.get("create_policy") + delete_policy = kwargs.get("delete_policy") + assert_policy = kwargs.get("assert_policy") + mode = kwargs.get("mode") + link = kwargs.get("link", False) + link_name = kwargs.get("link_name") + node_type = kwargs.get("node_type") + durable = kwargs.get("durable", False) + link_reliability = kwargs.get("link_reliability") + x_declare_list = kwargs.get("x_declare_list", []) + x_bindings_list = kwargs.get("x_bindings_list", []) + x_subscribe_list = kwargs.get("x_subscribe_list", []) + + node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0) + link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or + len(x_bindings_list) > 0 or len(x_subscribe_list) > 0) + assert not (node_flag and link_flag) + + opt_str_list = [] + if create_policy != None: + opt_str_list.append("create: %s" % create_policy) + if delete_policy != None: + opt_str_list.append("delete: %s" % delete_policy) + if assert_policy != None: + opt_str_list.append("assert: %s" % assert_policy) + if mode != None: + opt_str_list.append("mode: %s" % mode) + if node_flag or link_flag: + node_str_list = [] + if link_name != None: + node_str_list.append("name: \"%s\"" % link_name) + if node_type != None: + node_str_list.append("type: %s" % node_type) + if durable: + node_str_list.append("durable: True") + if link_reliability != None: + node_str_list.append("reliability: %s" % link_reliability) + if len(x_declare_list) > 0: + node_str_list.append("x-declare: %s" % self._fmt_map(x_declare_list)) + if len(x_bindings_list) > 0: + node_str_list.append("x-bindings: %s" % self._fmt_list(x_bindings_list)) + if len(x_subscribe_list) > 0: + node_str_list.append("x-subscribe: %s" % self._fmt_map(x_subscribe_list)) + if node_flag: + opt_str_list.append("node: %s" % self._fmt_map(node_str_list)) + else: + opt_str_list.append("link: %s" % self._fmt_map(node_str_list)) + addr_str = node_name + if node_subject != None: + addr_str += "/%s" % node_subject + if len(opt_str_list) > 0: + addr_str += "; %s" % self._fmt_map(opt_str_list) + return addr_str + + def snd_addr(self, node_name, **kwargs): + """ Create a send (node) address""" + # Get keyword args + topic = kwargs.get("topic") + topic_flag = kwargs.get("topic_flag", False) + auto_create = kwargs.get("auto_create", True) + auto_delete = kwargs.get("auto_delete", False) + durable = kwargs.get("durable", False) + exclusive = kwargs.get("exclusive", False) + ftd_count = kwargs.get("ftd_count") + ftd_size = kwargs.get("ftd_size") + policy = kwargs.get("policy", "flow-to-disk") + exchage_type = kwargs.get("exchage_type") + + create_policy = None + if auto_create: + create_policy = "always" + delete_policy = None + if auto_delete: + delete_policy = "always" + node_type = None + if topic != None or topic_flag: + node_type = "topic" + x_declare_list = ["\"exclusive\": %s" % exclusive] + if ftd_count != None or ftd_size != None: + queue_policy = ["\'qpid.policy_type\': %s" % policy] + if ftd_count: + queue_policy.append("\'qpid.max_count\': %d" % ftd_count) + if ftd_size: + queue_policy.append("\'qpid.max_size\': %d" % ftd_size) + x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) + if exchage_type != None: + x_declare_list.append("type: %s" % exchage_type) + + return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy, + node_type=node_type, durable=durable, x_declare_list=x_declare_list) + + def rcv_addr(self, node_name, **kwargs): + """ Create a receive (link) address""" + # Get keyword args + auto_create = kwargs.get("auto_create", True) + auto_delete = kwargs.get("auto_delete", False) + link_name = kwargs.get("link_name") + durable = kwargs.get("durable", False) + browse = kwargs.get("browse", False) + exclusive = kwargs.get("exclusive", False) + binding_list = kwargs.get("binding_list", []) + ftd_count = kwargs.get("ftd_count") + ftd_size = kwargs.get("ftd_size") + policy = kwargs.get("policy", "flow-to-disk") + + create_policy = None + if auto_create: + create_policy = "always" + delete_policy = None + if auto_delete: + delete_policy = "always" + mode = None + if browse: + mode = "browse" + x_declare_list = ["\"exclusive\": %s" % exclusive] + if ftd_count != None or ftd_size != None: + queue_policy = ["\'qpid.policy_type\': %s" % policy] + if ftd_count: + queue_policy.append("\'qpid.max_count\': %d" % ftd_count) + if ftd_size: + queue_policy.append("\'qpid.max_size\': %d" % ftd_size) + x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) + x_bindings_list = [] + for binding in binding_list: + x_bindings_list.append("{exchange: %s, key: %s}" % binding) + if durable: reliability = 'at-least-once' + else: reliability = None + return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True, + link_name=link_name, durable=durable, x_declare_list=x_declare_list, + x_bindings_list=x_bindings_list, link_reliability=reliability) + + def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False): + """Check that a message is on a queue by dequeuing it and comparing it to the expected message""" + return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse) + + def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False, + emtpy_flag=False): + """Check that messages is on a queue by dequeuing them and comparing them to the expected messages""" + if emtpy_flag: + num_msgs = 0 + else: + num_msgs = len(exp_msg_list) + ssn = broker.connect().session(transactional=transactional) + rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs) + if num_msgs > 0: + try: + recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)] + except Empty: + self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i)) + for i in range(0, len(recieved_msg_list)): + self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content) + self.assertEqual(recieved_msg_list[i].correlation_id, exp_msg_list[i].correlation_id) + if empty: + self._chk_empty(queue, rcvr) + if ack: + ssn.acknowledge() + if transactional: + ssn.commit() + ssn.connection.close() + else: + if transactional: + ssn.commit() + return ssn + + + # Functions for finding strings in the broker log file (or other files) + + @staticmethod + def _read_file(file_name): + """Returns the content of file named file_name as a string""" + file_handle = file(file_name) + try: + return file_handle.read() + finally: + file_handle.close() + + def _get_hits(self, broker, search): + """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple + queues)""" + # TODO: Use sets when RHEL-4 is no longer supported + hits = [] + for hit in search.findall(self._read_file(broker.log)): + if hit not in hits: + hits.append(hit) + return hits + + def _reconsile_hits(self, broker, ftd_msgs, release_hits): + """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining + release_hits.""" + for msg in ftd_msgs: + found = False + for hit in release_hits: + if str(msg.id) in hit: + release_hits.remove(hit) + #print "Found %s in %s" % (msg.id, broker.log) + found = True + break + if not found: + self.assert_(False, "Unable to locate released message %s in log %s" % (msg.id, broker.log)) + if len(release_hits) > 0: + err = "Messages were unexpectedly released in log %s:\n" % broker.log + for hit in release_hits: + err += " %s\n" % hit + self.assert_(False, err) + + def check_msg_release(self, broker, ftd_msgs): + """ Check for 'Content released' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_release_on_commit(self, broker, ftd_msgs): + """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released on commit$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_release_on_recover(self, broker, ftd_msgs): + """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content released after recovery$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_block(self, broker, ftd_msgs): + """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content release blocked$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) + + def check_msg_block_on_commit(self, broker, ftd_msgs): + """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" + hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " + "Content release blocked on commit$", re.MULTILINE)) + self._reconsile_hits(broker, ftd_msgs, hits) diff --git a/cpp/src/tests/linearstore/run_long_python_tests b/cpp/src/tests/linearstore/run_long_python_tests new file mode 100644 index 0000000000..be6380302c --- /dev/null +++ b/cpp/src/tests/linearstore/run_long_python_tests @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +./run_python_tests LONG_TEST diff --git a/cpp/src/tests/linearstore/run_python_tests b/cpp/src/tests/linearstore/run_python_tests new file mode 100755 index 0000000000..4ff212a71c --- /dev/null +++ b/cpp/src/tests/linearstore/run_python_tests @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +source ${QPID_TEST_COMMON} + +ensure_python_tests + +#Add our directory to the python path +export PYTHONPATH=$srcdir/linearstore:${PYTHONPATH} + +MODULENAME=python_tests + +echo "Running Python tests in module ${MODULENAME}..." + +QPID_PORT=${QPID_PORT:-5672} +FAILING=${FAILING:-/dev/null} +PYTHON_TESTS=${PYTHON_TESTS:-$*} + +OUTDIR=${MODULENAME}.tmp +rm -rf ${OUTDIR} + +# To debug a test, add the following options to the end of the following line: +# -v DEBUG -c qpid.messaging.io.ops [*.testName] +${QPID_PYTHON_TEST} -m ${MODULENAME} -I ${FAILING} -DOUTDIR=${OUTDIR} ${PYTHON_TEST} || exit 1 + diff --git a/cpp/src/tests/linearstore/run_short_python_tests b/cpp/src/tests/linearstore/run_short_python_tests new file mode 100644 index 0000000000..9b9e7c59be --- /dev/null +++ b/cpp/src/tests/linearstore/run_short_python_tests @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +./run_python_tests SHORT_TEST |