summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java200
1 files changed, 156 insertions, 44 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index e1f93b975b..22a98b6f42 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -29,8 +29,6 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -98,7 +96,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,false));
// create always -------------------------------------------
@@ -107,10 +105,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
// create receiver -----------------------------------------
addr1 = "ADDR:testQueue2; { create: receiver }";
@@ -126,16 +124,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
@@ -161,7 +159,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
@@ -177,14 +175,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"doesn't resolve to an exchange or a queue"));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
}
@@ -221,7 +219,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// Even if the consumer is closed the queue and the bindings should be intact.
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
@@ -326,7 +324,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertTrue("Exchange not created as expected",(
- (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
+ (AMQSession_0_10)jmsSession).isExchangeExist(dest,true));
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
@@ -367,7 +365,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
{
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
@@ -506,14 +504,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
MessageConsumer cons3 = jmsSession.createConsumer(dest3);
assertTrue("Destination1 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest1, true));
assertTrue("Destination1 was not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
dest1.getAddressName(),dest1.getAddressName(), null));
assertTrue("Destination2 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest2,true));
assertTrue("Destination2 was not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
@@ -602,14 +600,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons.close();
// Using the ADDR method to create a more complicated queue
- String addr = "ADDR:amq.direct/x512; {create: receiver, " +
+ String addr = "ADDR:amq.direct/x512; {" +
"link : {name : 'MY.RESP.QUEUE', " +
"x-declare : { auto-delete: true, exclusive: true, " +
"arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
queue = ssn.createQueue(addr);
- prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
+ prod = ssn.createProducer(queue);
assertTrue("MY.RESP.QUEUE was not created as expected",(
(AMQSession_0_10)ssn).isQueueBound("amq.direct",
"MY.RESP.QUEUE","x512", null));
@@ -677,8 +675,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// Using the ADDR method to create a more complicated topic
topic = ssn.createTopic(addr);
- prod = ssn.createProducer(topic);
cons = ssn.createConsumer(topic);
+ prod = ssn.createProducer(topic);
assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
(AMQSession_0_10)ssn).isQueueBound("vehicles",
@@ -778,7 +776,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testSubscriptionForSameDestination() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}");
+ Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
MessageConsumer consumer1 = ssn.createConsumer(dest);
MessageConsumer consumer2 = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(dest);
@@ -840,7 +838,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"}";
// Using the ADDR method to create a more complicated topic
- MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr));
+ Topic topic = ssn.createTopic(addr);
+ MessageConsumer cons = ssn.createConsumer(topic);
assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
(AMQSession_0_10)ssn).isQueueBound("MRKT",
@@ -854,7 +853,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
(AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","CNTL.#", null));
- MessageProducer prod = ssn.createProducer(ssn.createTopic(addr));
+ MessageProducer prod = ssn.createProducer(topic);
Message msg = ssn.createTextMessage("test");
msg.setStringProperty("qpid.subject", "NASDAQ.ABCD");
prod.send(msg);
@@ -909,32 +908,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
+
Properties props = new Properties();
props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
- props.setProperty("destination.address1", "ADDR:amq.topic");
- props.setProperty("destination.address2", "ADDR:amq.direct/test");
- String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," +
- "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
- props.setProperty("destination.address3", addrStr);
- props.setProperty("topic.address4", "hello.world");
- addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ props.setProperty("destination.address1", "ADDR:amq.topic/test");
+ props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr);
+ props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr);
+ String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
props.setProperty("destination.address5", addrStr);
Context ctx = new InitialContext(props);
- for (int i=1; i < 5; i++)
+ for (int i=1; i < 4; i++)
{
Topic topic = (Topic) ctx.lookup("address"+i);
- createDurableSubscriber(ctx,ssn,"address"+i,topic);
+ createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test");
}
Topic topic = ssn.createTopic("ADDR:news.us");
- createDurableSubscriber(ctx,ssn,"my-dest",topic);
+ createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us");
Topic namedQueue = (Topic) ctx.lookup("address5");
try
{
- createDurableSubscriber(ctx,ssn,"my-queue",namedQueue);
+ createDurableSubscriber(ctx,ssn,"my-queue",namedQueue,"ADDR:amq.topic/test");
fail("Exception should be thrown. Durable subscribers cannot be created for Queues");
}
catch(JMSException e)
@@ -943,16 +941,74 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
e.getMessage());
}
}
-
- private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception
+
+ public void testDurableSubscription() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName());
+ MessageProducer publisher = session.createProducer(topic);
+ MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName());
+
+ TextMessage messageToSend = session.createTextMessage("Test0");
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ Message receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+
+ subscriber.close();
+
+ messageToSend = session.createTextMessage("Test1");
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ subscriber = session.createDurableSubscriber(topic, getTestQueueName());
+ receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+ }
+
+ public void testDurableSubscriptionnWithSelector() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName());
+ MessageProducer publisher = session.createProducer(topic);
+ MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false);
+
+ TextMessage messageToSend = session.createTextMessage("Test0");
+ messageToSend.setIntProperty("id", 1);
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ Message receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+ assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+
+ subscriber.close();
+
+ messageToSend = session.createTextMessage("Test1");
+ messageToSend.setIntProperty("id", 1);
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false);
+ receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+ assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+ }
+
+ private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception
{
MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
- MessageProducer prod = ssn.createProducer(topic);
+ MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr));
Message m = ssn.createTextMessage(destName);
prod.send(m);
Message msg = cons.receive(1000);
- assertNotNull(msg);
+ assertNotNull("Message not received as expected when using Topic : " + topic,msg);
assertEquals(destName,((TextMessage)msg).getText());
ssn.unsubscribe(destName);
}
@@ -977,7 +1033,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
@@ -993,7 +1049,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
@@ -1010,9 +1066,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
-
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
}
/**
@@ -1094,7 +1148,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test"));
MessageProducer prod = ssn.createProducer(null);
- Queue queue = ssn.createQueue("ADDR:amq.topic/test");
+ Topic queue = ssn.createTopic("ADDR:amq.topic/test");
prod.send(queue,ssn.createTextMessage("A"));
Message msg = cons.receive(1000);
@@ -1307,4 +1361,62 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("message should be re-received by consumer after rollback", receivedMessage);
jmsSession.commit();
}
+
+ /**
+ * Test Goals :
+ *
+ * 1. Verify that link bindings are created and destroyed after creating and closing a subscriber.
+ * 2. Verify that link bindings are created and destroyed after creating and closing a subscriber.
+ */
+ public void testLinkBindingBehavior() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String addr = "ADDR:my-queue; {create: always, " +
+ "link: " +
+ "{" +
+ "x-bindings: [{exchange : 'amq.direct', key : test}]," +
+ "}" +
+ "}";
+
+ AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+ AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession;
+
+ assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true));
+ assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null));
+
+ cons.close(); // closing consumer, link binding should be removed now.
+ assertTrue("Queue should still be there",ssn.isQueueExist(dest, true));
+ assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null));
+
+ MessageProducer prod = jmsSession.createProducer(dest);
+ assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null));
+ prod.close();
+ assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null));
+ }
+
+ /**
+ * Test Goals : Verifies that the subscription queue created is as specified under link properties.
+ */
+ public void testCustomizingSubscriptionQueue() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String xDeclareArgs = "x-declare: { exclusive: false, auto-delete: false," +
+ "alternate-exchange: 'amq.fanout'," +
+ "arguments: {'qpid.max_size': 1000,'qpid.max_count': 100}" +
+ "}";
+
+ String addr = "ADDR:amq.topic/test; {link: {name:my-queue, durable:true," + xDeclareArgs + "}}";
+ Destination dest = ssn.createTopic(addr);
+ MessageConsumer cons = ssn.createConsumer(dest);
+
+ String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}";
+ AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr);
+ ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+
+ // Verify that the producer does not delete the subscription queue.
+ MessageProducer prod = ssn.createProducer(dest);
+ prod.close();
+ ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+ }
}