# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpid.tests.messaging.implementation import * from qpid.tests.messaging import VersionTest class MiscellaneousTests (VersionTest): """ Tests for various aspects of qpidd behaviour """ def test_exclusive(self): con = self.create_connection("amqp1.0", True) rcv = con.session().receiver("q; {create:always, node:{properties:{exclusive:True,auto-delete:True}}}") other = self.create_connection("amqp1.0", True) try: #can send to the queue snd = other.session().sender("q") #can browse the queue browser = other.session().receiver("q; {mode:browse}") #can't consume from the queue try: consumer = other.session().receiver("q") assert False, ("Should not be able to consume from exclusively owned queue") except LinkError, e: None try: exclusive = other.session().receiver("q; {create: always, node:{properties:{exclusive:True}}}") assert False, ("Should not be able to consume exclusively from exclusively owned queue") except LinkError, e: None finally: rcv.close() con.close() other.close() class AutoDeleteExchangeTests(VersionTest): def init_test(self, exchange_type="topic"): rcv = self.ssn.receiver("my-topic; {create:always, node:{type:topic, properties:{'exchange-type':%s, 'auto-delete':True}}}" % exchange_type) snd = self.ssn.sender("my-topic") #send some messages msgs = [Message(content=c) for c in ['a','b','c','d']] for m in msgs: snd.send(m) #verify receipt for expected in msgs: msg = rcv.fetch(0) assert msg.content == expected.content self.ssn.acknowledge(msg) return (rcv, snd) def on_rcv_detach_test(self, exchange_type="topic"): rcv, snd = self.init_test(exchange_type) rcv.close() #verify exchange is still there snd.send(Message(content="will be dropped")) snd.close() #now verify it is no longer there try: self.ssn.sender("my-topic") assert False, "Attempt to send to deleted exchange should fail" except MessagingError: None def on_snd_detach_test(self, exchange_type="topic"): rcv, snd = self.init_test(exchange_type) snd.close() #verify exchange is still there snd = self.ssn.sender("my-topic") snd.send(Message(content="will be dropped")) snd.close() rcv.close() #now verify it is no longer there try: self.ssn.sender("my-topic") assert False, "Attempt to send to deleted exchange should fail" except MessagingError: None def test_autodelete_fanout_exchange_on_rcv_detach(self): self.on_rcv_detach_test("fanout") def test_autodelete_fanout_exchange_on_snd_detach(self): self.on_snd_detach_test("fanout") def test_autodelete_direct_exchange_on_rcv_detach(self): self.on_rcv_detach_test("direct") def test_autodelete_direct_exchange_on_snd_detach(self): self.on_snd_detach_test("direct") def test_autodelete_topic_exchange_on_rcv_detach(self): self.on_rcv_detach_test("topic") def test_autodelete_topic_exchange_on_snd_detach(self): self.on_snd_detach_test("topic") def test_autodelete_headers_exchange_on_rcv_detach(self): self.on_rcv_detach_test("headers") def test_autodelete_headers_exchange_on_snd_detach(self): self.on_snd_detach_test("headers")