diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java | 200 |
1 files changed, 156 insertions, 44 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index e1f93b975b..22a98b6f42 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -29,8 +29,6 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -98,7 +96,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,false)); // create always ------------------------------------------- @@ -107,10 +105,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); // create receiver ----------------------------------------- addr1 = "ADDR:testQueue2; { create: receiver }"; @@ -126,16 +124,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; @@ -161,7 +159,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; @@ -177,14 +175,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "doesn't resolve to an exchange or a queue")); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); } @@ -221,7 +219,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Even if the consumer is closed the queue and the bindings should be intact. assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", @@ -326,7 +324,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertTrue("Exchange not created as expected",( - (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true)); + (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( @@ -367,7 +365,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception { assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", @@ -506,14 +504,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons3 = jmsSession.createConsumer(dest3); assertTrue("Destination1 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); assertTrue("Destination1 was not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); assertTrue("Destination2 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); assertTrue("Destination2 was not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", @@ -602,14 +600,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons.close(); // Using the ADDR method to create a more complicated queue - String addr = "ADDR:amq.direct/x512; {create: receiver, " + + String addr = "ADDR:amq.direct/x512; {" + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; queue = ssn.createQueue(addr); - prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); + prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( (AMQSession_0_10)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); @@ -677,8 +675,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Using the ADDR method to create a more complicated topic topic = ssn.createTopic(addr); - prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); + prod = ssn.createProducer(topic); assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( (AMQSession_0_10)ssn).isQueueBound("vehicles", @@ -778,7 +776,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSubscriptionForSameDestination() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}"); + Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); MessageConsumer consumer1 = ssn.createConsumer(dest); MessageConsumer consumer2 = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(dest); @@ -840,7 +838,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "}"; // Using the ADDR method to create a more complicated topic - MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr)); + Topic topic = ssn.createTopic(addr); + MessageConsumer cons = ssn.createConsumer(topic); assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( (AMQSession_0_10)ssn).isQueueBound("MRKT", @@ -854,7 +853,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); - MessageProducer prod = ssn.createProducer(ssn.createTopic(addr)); + MessageProducer prod = ssn.createProducer(topic); Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "NASDAQ.ABCD"); prod.send(msg); @@ -909,32 +908,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; + Properties props = new Properties(); props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); - props.setProperty("destination.address1", "ADDR:amq.topic"); - props.setProperty("destination.address2", "ADDR:amq.direct/test"); - String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," + - "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; - props.setProperty("destination.address3", addrStr); - props.setProperty("topic.address4", "hello.world"); - addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + props.setProperty("destination.address1", "ADDR:amq.topic/test"); + props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); + props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); + String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; props.setProperty("destination.address5", addrStr); Context ctx = new InitialContext(props); - for (int i=1; i < 5; i++) + for (int i=1; i < 4; i++) { Topic topic = (Topic) ctx.lookup("address"+i); - createDurableSubscriber(ctx,ssn,"address"+i,topic); + createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test"); } Topic topic = ssn.createTopic("ADDR:news.us"); - createDurableSubscriber(ctx,ssn,"my-dest",topic); + createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us"); Topic namedQueue = (Topic) ctx.lookup("address5"); try { - createDurableSubscriber(ctx,ssn,"my-queue",namedQueue); + createDurableSubscriber(ctx,ssn,"my-queue",namedQueue,"ADDR:amq.topic/test"); fail("Exception should be thrown. Durable subscribers cannot be created for Queues"); } catch(JMSException e) @@ -943,16 +941,74 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase e.getMessage()); } } - - private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception + + public void testDurableSubscription() throws Exception + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName()); + MessageProducer publisher = session.createProducer(topic); + MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName()); + + TextMessage messageToSend = session.createTextMessage("Test0"); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + Message receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + + subscriber.close(); + + messageToSend = session.createTextMessage("Test1"); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + subscriber = session.createDurableSubscriber(topic, getTestQueueName()); + receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + } + + public void testDurableSubscriptionnWithSelector() throws Exception + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName()); + MessageProducer publisher = session.createProducer(topic); + MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false); + + TextMessage messageToSend = session.createTextMessage("Test0"); + messageToSend.setIntProperty("id", 1); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + Message receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); + + subscriber.close(); + + messageToSend = session.createTextMessage("Test1"); + messageToSend.setIntProperty("id", 1); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false); + receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); + } + + private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception { MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); - MessageProducer prod = ssn.createProducer(topic); + MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr)); Message m = ssn.createTextMessage(destName); prod.send(m); Message msg = cons.receive(1000); - assertNotNull(msg); + assertNotNull("Message not received as expected when using Topic : " + topic,msg); assertEquals(destName,((TextMessage)msg).getText()); ssn.unsubscribe(destName); } @@ -977,7 +1033,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; @@ -993,7 +1049,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; @@ -1010,9 +1066,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); } /** @@ -1094,7 +1148,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - Queue queue = ssn.createQueue("ADDR:amq.topic/test"); + Topic queue = ssn.createTopic("ADDR:amq.topic/test"); prod.send(queue,ssn.createTextMessage("A")); Message msg = cons.receive(1000); @@ -1307,4 +1361,62 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("message should be re-received by consumer after rollback", receivedMessage); jmsSession.commit(); } + + /** + * Test Goals : + * + * 1. Verify that link bindings are created and destroyed after creating and closing a subscriber. + * 2. Verify that link bindings are created and destroyed after creating and closing a subscriber. + */ + public void testLinkBindingBehavior() throws Exception + { + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String addr = "ADDR:my-queue; {create: always, " + + "link: " + + "{" + + "x-bindings: [{exchange : 'amq.direct', key : test}]," + + "}" + + "}"; + + AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr); + MessageConsumer cons = jmsSession.createConsumer(dest); + AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession; + + assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true)); + assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null)); + + cons.close(); // closing consumer, link binding should be removed now. + assertTrue("Queue should still be there",ssn.isQueueExist(dest, true)); + assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null)); + + MessageProducer prod = jmsSession.createProducer(dest); + assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null)); + prod.close(); + assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null)); + } + + /** + * Test Goals : Verifies that the subscription queue created is as specified under link properties. + */ + public void testCustomizingSubscriptionQueue() throws Exception + { + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String xDeclareArgs = "x-declare: { exclusive: false, auto-delete: false," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {'qpid.max_size': 1000,'qpid.max_count': 100}" + + "}"; + + String addr = "ADDR:amq.topic/test; {link: {name:my-queue, durable:true," + xDeclareArgs + "}}"; + Destination dest = ssn.createTopic(addr); + MessageConsumer cons = ssn.createConsumer(dest); + + String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}"; + AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr); + ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + + // Verify that the producer does not delete the subscription queue. + MessageProducer prod = ssn.createProducer(dest); + prod.close(); + ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + } } |