diff options
Diffstat (limited to 'tests/src/py/qpid_tests/broker_0_8/basic.py')
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_8/basic.py | 47 |
1 files changed, 46 insertions, 1 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_8/basic.py b/tests/src/py/qpid_tests/broker_0_8/basic.py index d5837fc19c..606aad1293 100644 --- a/tests/src/py/qpid_tests/broker_0_8/basic.py +++ b/tests/src/py/qpid_tests/broker_0_8/basic.py @@ -79,6 +79,51 @@ class BasicTests(TestBase): except Closed, e: self.assertChannelException(403, e.args[0]) + def test_reconnect_to_durable_subscription(self): + try: + publisherchannel = self.channel + my_id = "my_id" + consumer_connection_properties_with_instance = {"instance": my_id} + queue_for_subscription = "queue_for_subscription_%s" % my_id + topic_name = "my_topic_name" + test_message = self.uniqueString() + + durable_subscription_client = self.connect(client_properties=consumer_connection_properties_with_instance) + consumerchannel = durable_subscription_client.channel(1) + consumerchannel.channel_open() + + self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, queue_for_subscription, topic_name) + + # disconnect + durable_subscription_client.close() + + # send message to topic + publisherchannel.basic_publish(routing_key=topic_name, exchange="amq.topic", content=Content(test_message)) + + # reconnect and consume message + durable_subscription_client = self.connect(client_properties=consumer_connection_properties_with_instance) + consumerchannel = durable_subscription_client.channel(1) + consumerchannel.channel_open() + + self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, queue_for_subscription, topic_name) + + # Create consumer and consume the message that was sent whilst subscriber was disconnected. By convention we + # declare the consumer as exclusive to forbid concurrent access. + subscription = consumerchannel.basic_consume(queue=queue_for_subscription, exclusive=True) + queue = durable_subscription_client.queue(subscription.consumer_tag) + + # consume and verify message content + msg = queue.get(timeout=1) + self.assertEqual(test_message, msg.content.body) + consumerchannel.basic_ack(delivery_tag=msg.delivery_tag) + finally: + publisherchannel.queue_delete(queue=queue_for_subscription) + durable_subscription_client.close() + + def _declare_and_bind_exclusive_queue_on_topic_exchange(self, channel, queue, topic_name): + channel.queue_declare(queue=queue, exclusive=True, auto_delete=False, durable=True) + channel.queue_bind(exchange="amq.topic", queue=queue, routing_key=topic_name) + def test_consume_queue_errors(self): """ Test error conditions associated with the queue field of the consume method: @@ -129,7 +174,7 @@ class BasicTests(TestBase): myqueue = self.client.queue("my-consumer") msg = myqueue.get(timeout=1) self.assertEqual("One", msg.content.body) - + #cancel should stop messages being delivered channel.basic_cancel(consumer_tag="my-consumer") channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) |