summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java542
1 files changed, 78 insertions, 464 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index b5bb74327e..51589c705f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -25,37 +25,25 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
-import java.util.Properties;
-
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
import javax.naming.Context;
-import javax.naming.InitialContext;
import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.ExecutionErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -199,7 +187,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
}
-
+
+ // todo add tests for delete options
+
public void testCreateQueue() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -212,7 +202,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"durable: true ," +
"x-declare: " +
"{" +
- "exclusive: true," +
+ "auto-delete: true," +
"arguments: {" +
"'qpid.max_size': 1000," +
"'qpid.max_count': 100" +
@@ -228,9 +218,6 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"}";
AMQDestination dest = new AMQAnyDestination(addr);
MessageConsumer cons = jmsSession.createConsumer(dest);
- cons.close();
-
- // Even if the consumer is closed the queue and the bindings should be intact.
assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
@@ -259,44 +246,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
(AMQSession_0_10)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, args));
- MessageProducer prod = jmsSession.createProducer(dest);
- prod.send(jmsSession.createTextMessage("test"));
-
- MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue"));
- Message m = cons2.receive(1000);
- assertNotNull("Should receive message sent to my-queue",m);
- assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT));
}
public void testCreateExchange() throws Exception
{
- createExchangeImpl(false, false);
- }
-
- /**
- * Verify creating an exchange via an Address, with supported
- * exchange-declare arguments.
- */
- public void testCreateExchangeWithArgs() throws Exception
- {
- createExchangeImpl(true, false);
- }
-
- /**
- * Verify that when creating an exchange via an Address, if a
- * nonsense argument is specified the broker throws an execution
- * exception back on the session with NOT_IMPLEMENTED status.
- */
- public void testCreateExchangeWithNonsenseArgs() throws Exception
- {
- createExchangeImpl(true, true);
- }
-
- private void createExchangeImpl(final boolean withExchangeArgs,
- final boolean useNonsenseArguments) throws Exception
- {
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String addr = "ADDR:my-exchange/hello; " +
"{ " +
"create: always, " +
@@ -306,36 +261,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"x-declare: " +
"{ " +
"type:direct, " +
- "auto-delete: true" +
- createExchangeArgsString(withExchangeArgs, useNonsenseArguments) +
+ "auto-delete: true, " +
+ "arguments: {" +
+ "'qpid.msg_sequence': 1, " +
+ "'qpid.ive': 1" +
+ "}" +
"}" +
"}" +
"}";
AMQDestination dest = new AMQAnyDestination(addr);
-
- MessageConsumer cons;
- try
- {
- cons = jmsSession.createConsumer(dest);
- if(useNonsenseArguments)
- {
- fail("Expected execution exception during exchange declare did not occur");
- }
- }
- catch(JMSException e)
- {
- if(useNonsenseArguments && e.getCause().getMessage().contains(ExecutionErrorCode.NOT_IMPLEMENTED.toString()))
- {
- //expected because we used an argument which the broker doesn't have functionality
- //for. We can't do the rest of the test as a result of the exception, just stop.
- return;
- }
- else
- {
- fail("Unexpected exception whilst creating consumer: " + e);
- }
- }
+ MessageConsumer cons = jmsSession.createConsumer(dest);
assertTrue("Exchange not created as expected",(
(AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
@@ -350,66 +286,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons = jmsSession.createConsumer(dest);
}
- private String createExchangeArgsString(final boolean withExchangeArgs,
- final boolean useNonsenseArguments)
- {
- String argsString;
-
- if(withExchangeArgs && useNonsenseArguments)
- {
- argsString = ", arguments: {" +
- "'abcd.1234.wxyz': 1, " +
- "}";
- }
- else if(withExchangeArgs)
- {
- argsString = ", arguments: {" +
- "'qpid.msg_sequence': 1, " +
- "'qpid.ive': 1" +
- "}";
- }
- else
- {
- argsString = "";
- }
-
- return argsString;
- }
-
- public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
- {
- assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
- assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), null));
-
- assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
- dest.getAddressName(),"test", null));
-
- assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
- dest.getAddressName(),"a.#", null));
-
- Address a = Address.parse(headersBinding);
- assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
- dest.getAddressName(),null, a.getOptions()));
- }
-
- /**
- * Test goal: Verifies that a producer and consumer creation triggers the correct
- * behavior for x-bindings specified in node props.
- */
public void testBindQueueWithArgs() throws Exception
{
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
- String addr = "node: " +
+ String addr = "ADDR:my-queue/hello; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
"{" +
"durable: true ," +
"x-declare: " +
@@ -424,14 +310,28 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"}" +
"}";
+ AMQDestination dest = new AMQAnyDestination(addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+
+ assertTrue("Queue not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest.getAddressName(),dest.getAddressName(), null));
- AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr);
- MessageConsumer cons = jmsSession.createConsumer(dest1);
- checkQueueForBindings(jmsSession,dest1,headersBinding);
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ dest.getAddressName(),"test", null));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ dest.getAddressName(),"a.#", null));
- AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr);
- MessageProducer prod = jmsSession.createProducer(dest2);
- checkQueueForBindings(jmsSession,dest2,headersBinding);
+ Address a = Address.parse(headersBinding);
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ dest.getAddressName(),null, a.getOptions()));
}
/**
@@ -567,6 +467,39 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
/**
+ * Test goal: Verifies that and address based destination can be used successfully
+ * as a reply to.
+ */
+ public void testAddressBasedReplyTo() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ String addr = "ADDR:amq.direct/x512; {create: receiver, " +
+ "link : {name : 'MY.RESP.QUEUE', " +
+ "x-declare : { auto-delete: true, exclusive: true, " +
+ "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring }} } }";
+
+ Destination replyTo = new AMQAnyDestination(addr);
+ Destination dest =new AMQAnyDestination("ADDR:amq.direct/Hello");
+
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+ MessageProducer prod = jmsSession.createProducer(dest);
+ Message m = jmsSession.createTextMessage("Hello");
+ m.setJMSReplyTo(replyTo);
+ prod.send(m);
+
+ Message msg = cons.receive(1000);
+ assertNotNull("consumer should have received the message",msg);
+
+ MessageConsumer replyToCons = jmsSession.createConsumer(replyTo);
+ MessageProducer replyToProd = jmsSession.createProducer(msg.getJMSReplyTo());
+ replyToProd.send(jmsSession.createTextMessage("reply"));
+
+ Message replyToMsg = replyToCons.receive(1000);
+ assertNotNull("The reply to consumer should have got the message",replyToMsg);
+ }
+
+ /**
* Test goal: Verifies that session.createQueue method
* works as expected both with the new and old addressing scheme.
*/
@@ -587,22 +520,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons.close();
// Using the ADDR method
- // default case
queue = ssn.createQueue("ADDR:my-queue2");
- try
- {
- prod = ssn.createProducer(queue);
- fail("The client should throw an exception, since there is no queue present in the broker");
- }
- catch(Exception e)
- {
- String s = "The name 'my-queue2' supplied in the address " +
- "doesn't resolve to an exchange or a queue";
- assertEquals(s,e.getCause().getCause().getMessage());
- }
-
- // explicit create case
- queue = ssn.createQueue("ADDR:my-queue2; {create: sender}");
prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
assertTrue("my-queue2 was not created as expected",(
@@ -629,25 +547,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
/**
- * Test goal: Verifies that session.creatTopic method works as expected
- * both with the new and old addressing scheme.
+ * Test goal: Verifies that session.creatTopic method
+ * works as expected both with the new and old addressing scheme.
*/
public void testSessionCreateTopic() throws Exception
{
- sessionCreateTopicImpl(false);
- }
-
- /**
- * Test goal: Verifies that session.creatTopic method works as expected
- * both with the new and old addressing scheme when adding exchange arguments.
- */
- public void testSessionCreateTopicWithExchangeArgs() throws Exception
- {
- sessionCreateTopicImpl(true);
- }
-
- private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception
- {
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// Using the BURL method
@@ -667,7 +571,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
String addr = "ADDR:vehicles/bus; " +
"{ " +
"create: always, " +
@@ -677,8 +581,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"x-declare: " +
"{ " +
"type:direct, " +
- "auto-delete: true" +
- createExchangeArgsString(withExchangeArgs, false) +
+ "auto-delete: true, " +
+ "arguments: {" +
+ "'qpid.msg_sequence': 1, " +
+ "'qpid.ive': 1" +
+ "}" +
"}" +
"}, " +
"link: {name : my-topic, " +
@@ -790,7 +697,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testSubscriptionForSameDestination() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}");
+ Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
MessageConsumer consumer1 = ssn.createConsumer(dest);
MessageConsumer consumer2 = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(dest);
@@ -889,297 +796,4 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
}
}
-
- public void testQueueReceiversAndTopicSubscriber() throws Exception
- {
- Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
- Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
-
- QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- QueueReceiver receiver = qSession.createReceiver(queue);
-
- TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber sub = tSession.createSubscriber(topic);
-
- Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
- prod1.send(ssn.createTextMessage("test1"));
-
- MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
- prod2.send(ssn.createTextMessage("test2"));
-
- Message msg1 = receiver.receive();
- assertNotNull(msg1);
- assertEquals("test1",((TextMessage)msg1).getText());
-
- Message msg2 = sub.receive();
- assertNotNull(msg2);
- assertEquals("test2",((TextMessage)msg2).getText());
- }
-
- public void testDurableSubscriber() throws Exception
- {
- Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
- Properties props = new Properties();
- props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
- props.setProperty("destination.address1", "ADDR:amq.topic");
- props.setProperty("destination.address2", "ADDR:amq.direct/test");
- String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," +
- "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
- props.setProperty("destination.address3", addrStr);
- props.setProperty("topic.address4", "hello.world");
- addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
- props.setProperty("destination.address5", addrStr);
-
- Context ctx = new InitialContext(props);
-
- for (int i=1; i < 5; i++)
- {
- Topic topic = (Topic) ctx.lookup("address"+i);
- createDurableSubscriber(ctx,ssn,"address"+i,topic);
- }
-
- Topic topic = ssn.createTopic("ADDR:news.us");
- createDurableSubscriber(ctx,ssn,"my-dest",topic);
-
- Topic namedQueue = (Topic) ctx.lookup("address5");
- try
- {
- createDurableSubscriber(ctx,ssn,"my-queue",namedQueue);
- fail("Exception should be thrown. Durable subscribers cannot be created for Queues");
- }
- catch(JMSException e)
- {
- assertEquals("Durable subscribers can only be created for Topics",
- e.getMessage());
- }
- }
-
- private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception
- {
- MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
- MessageProducer prod = ssn.createProducer(topic);
-
- Message m = ssn.createTextMessage(destName);
- prod.send(m);
- Message msg = cons.receive(1000);
- assertNotNull(msg);
- assertEquals(destName,((TextMessage)msg).getText());
- ssn.unsubscribe(destName);
- }
-
- public void testDeleteOptions() throws Exception
- {
- Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons;
-
- // default (create never, assert never) -------------------
- // create never --------------------------------------------
- String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
- AMQDestination dest = new AMQAnyDestination(addr1);
- try
- {
- cons = jmsSession.createConsumer(dest);
- cons.close();
- }
- catch(JMSException e)
- {
- fail("Exception should not be thrown. Exception thrown is : " + e);
- }
-
- assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
-
- String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
- dest = new AMQAnyDestination(addr2);
- try
- {
- cons = jmsSession.createConsumer(dest);
- cons.close();
- }
- catch(JMSException e)
- {
- fail("Exception should not be thrown. Exception thrown is : " + e);
- }
-
- assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
-
- String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
- dest = new AMQAnyDestination(addr3);
- try
- {
- cons = jmsSession.createConsumer(dest);
- MessageProducer prod = jmsSession.createProducer(dest);
- prod.close();
- }
- catch(JMSException e)
- {
- fail("Exception should not be thrown. Exception thrown is : " + e);
- }
-
- assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
-
- }
-
- /**
- * Test Goals : 1. Test if the client sets the correct accept mode for unreliable
- * and at-least-once.
- * 2. Test default reliability modes for Queues and Topics.
- * 3. Test if an exception is thrown if exactly-once is used.
- * 4. Test if an exception is thrown if at-least-once is used with topics.
- *
- * Test Strategy: For goal #1 & #2
- * For unreliable and at-least-once the test tries to receives messages
- * in client_ack mode but does not ack the messages.
- * It will then close the session, recreate a new session
- * and will then try to verify the queue depth.
- * For unreliable the messages should have been taken off the queue.
- * For at-least-once the messages should be put back onto the queue.
- *
- */
-
- public void testReliabilityOptions() throws Exception
- {
- String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}";
- acceptModeTest(addr1,0);
-
- String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}";
- acceptModeTest(addr2,2);
-
- // Default accept-mode for topics
- acceptModeTest("ADDR:amq.topic/test",0);
-
- // Default accept-mode for queues
- acceptModeTest("ADDR:testQueue1;{create: always}",2);
-
- String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
- try
- {
- AMQAnyDestination dest = new AMQAnyDestination(addr3);
- fail("An exception should be thrown indicating it's an unsupported type");
- }
- catch(Exception e)
- {
- assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
- }
-
- String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
- try
- {
- AMQAnyDestination dest = new AMQAnyDestination(addr4);
- Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons = ssn.createConsumer(dest);
- fail("An exception should be thrown indicating it's an unsupported combination");
- }
- catch(Exception e)
- {
- assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
- }
- }
-
- private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
- {
- Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons;
- MessageProducer prod;
-
- AMQDestination dest = new AMQAnyDestination(address);
- cons = ssn.createConsumer(dest);
- prod = ssn.createProducer(dest);
-
- for (int i=0; i < expectedQueueDepth; i++)
- {
- prod.send(ssn.createTextMessage("Msg" + i));
- }
-
- for (int i=0; i < expectedQueueDepth; i++)
- {
- Message msg = cons.receive(1000);
- assertNotNull(msg);
- assertEquals("Msg" + i,((TextMessage)msg).getText());
- }
-
- ssn.close();
- ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
- assertEquals(expectedQueueDepth,queueDepth);
- cons.close();
- prod.close();
- }
-
- public void testDestinationOnSend() throws Exception
- {
- Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons = ssn.createConsumer(ssn.createTopic("amq.topic/test"));
- MessageProducer prod = ssn.createProducer(null);
-
- Queue queue = ssn.createQueue("amq.topic/test");
- prod.send(queue,ssn.createTextMessage("A"));
-
- Message msg = cons.receive(1000);
- assertNotNull(msg);
- assertEquals("A",((TextMessage)msg).getText());
- prod.close();
- cons.close();
- }
-
- public void testReplyToWithNamelessExchange() throws Exception
- {
- System.setProperty("qpid.declare_exchanges","false");
- replyToTest("ADDR:my-queue;{create: always}");
- System.setProperty("qpid.declare_exchanges","true");
- }
-
- public void testReplyToWithCustomExchange() throws Exception
- {
- replyToTest("ADDR:hello;{create:always,node:{type:topic}}");
- }
-
- private void replyToTest(String replyTo) throws Exception
- {
- Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination replyToDest = AMQDestination.createDestination(replyTo);
- MessageConsumer replyToCons = session.createConsumer(replyToDest);
-
- Destination dest = session.createQueue("amq.direct/test");
-
- MessageConsumer cons = session.createConsumer(dest);
- MessageProducer prod = session.createProducer(dest);
- Message m = session.createTextMessage("test");
- m.setJMSReplyTo(replyToDest);
- prod.send(m);
-
- Message msg = cons.receive();
- MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
- prodR.send(session.createTextMessage("x"));
-
- Message m1 = replyToCons.receive();
- assertNotNull("The reply to consumer should have received the messsage",m1);
- }
-
- public void testAltExchangeInAddressString() throws Exception
- {
- String addr1 = "ADDR:my-exchange/test; {create: always, node:{type: topic,x-declare:{alternate-exchange:'amq.fanout'}}}";
- Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String altQueueAddr = "ADDR:my-alt-queue;{create: always, delete: receiver,node:{x-bindings:[{exchange:'amq.fanout'}] }}";
- MessageConsumer cons = session.createConsumer(session.createQueue(altQueueAddr));
-
- MessageProducer prod = session.createProducer(session.createTopic(addr1));
- prod.send(session.createMessage());
- prod.close();
- assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
-
- String addr2 = "ADDR:test-queue;{create:sender, delete: sender,node:{type:queue,x-declare:{alternate-exchange:'amq.fanout'}}}";
- prod = session.createProducer(session.createTopic(addr2));
- prod.send(session.createMessage());
- prod.close();
- assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
- cons.close();
- }
}