diff options
author | Alex Rudyy <orudyy@apache.org> | 2012-10-24 00:05:45 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2012-10-24 00:05:45 +0000 |
commit | f47c28c9ee2fb8f2967a221a28912060edcba749 (patch) | |
tree | 5362298530cbdf8ded409298f50a709ef038a2f8 | |
parent | a75bb8770d06f1013578bd59261a6a96b18bcf71 (diff) | |
download | qpid-python-f47c28c9ee2fb8f2967a221a28912060edcba749.tar.gz |
QPID-4389: Send the selector of durable subscriber in arguments of ExchangeBind command
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1401515 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 19 | ||||
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java | 60 |
2 files changed, 71 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e271436c21..12b174198a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.client.messaging.address.Node; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -624,7 +625,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) { - createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal()); + String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); + + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); queueName = consumer.getDestination().getAMQQueueName(); consumer.setQueuename(queueName); } @@ -1300,8 +1303,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } } - - void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException + + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException { Link link = dest.getLink(); String queueName = dest.getQueueName(); @@ -1325,12 +1328,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic link.isDurable() ? Option.DURABLE : Option.NONE, queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + Map<String,Object> bindingArguments = new HashMap<String, Object>(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); getQpidSession().exchangeBind(queueName, - dest.getAddressName(), - dest.getSubject(), - Collections.<String,Object>emptyMap()); + dest.getAddressName(), + dest.getSubject(), + bindingArguments); } - + public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 08ee70f072..974e8d6e96 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -941,7 +941,65 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase e.getMessage()); } } - + + public void testDurableSubscription() throws Exception + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName()); + MessageProducer publisher = session.createProducer(topic); + MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName()); + + TextMessage messageToSend = session.createTextMessage("Test0"); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + Message receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + + subscriber.close(); + + messageToSend = session.createTextMessage("Test1"); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + subscriber = session.createDurableSubscriber(topic, getTestQueueName()); + receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + } + + public void testDurableSubscriptionnWithSelector() throws Exception + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName()); + MessageProducer publisher = session.createProducer(topic); + MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false); + + TextMessage messageToSend = session.createTextMessage("Test0"); + messageToSend.setIntProperty("id", 1); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + Message receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); + + subscriber.close(); + + messageToSend = session.createTextMessage("Test1"); + messageToSend.setIntProperty("id", 1); + publisher.send(messageToSend); + ((AMQSession<?,?>)session).sync(); + + subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false); + receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); + } + private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception { MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); |