summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2012-10-24 00:05:45 +0000
committerAlex Rudyy <orudyy@apache.org>2012-10-24 00:05:45 +0000
commitf47c28c9ee2fb8f2967a221a28912060edcba749 (patch)
tree5362298530cbdf8ded409298f50a709ef038a2f8
parenta75bb8770d06f1013578bd59261a6a96b18bcf71 (diff)
downloadqpid-python-f47c28c9ee2fb8f2967a221a28912060edcba749.tar.gz
QPID-4389: Send the selector of durable subscriber in arguments of ExchangeBind command
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1401515 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java19
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java60
2 files changed, 71 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index e271436c21..12b174198a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -624,7 +625,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
{
- createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal());
+ String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
+
+ createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
queueName = consumer.getDestination().getAMQQueueName();
consumer.setQueuename(queueName);
}
@@ -1300,8 +1303,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
}
-
- void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
+
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
{
Link link = dest.getLink();
String queueName = dest.getQueueName();
@@ -1325,12 +1328,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
link.isDurable() ? Option.DURABLE : Option.NONE,
queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ Map<String,Object> bindingArguments = new HashMap<String, Object>();
+ bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
getQpidSession().exchangeBind(queueName,
- dest.getAddressName(),
- dest.getSubject(),
- Collections.<String,Object>emptyMap());
+ dest.getAddressName(),
+ dest.getSubject(),
+ bindingArguments);
}
-
+
public void setLegacyFieldsForQueueType(AMQDestination dest)
{
// legacy support
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 08ee70f072..974e8d6e96 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -941,7 +941,65 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
e.getMessage());
}
}
-
+
+ public void testDurableSubscription() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName());
+ MessageProducer publisher = session.createProducer(topic);
+ MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName());
+
+ TextMessage messageToSend = session.createTextMessage("Test0");
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ Message receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+
+ subscriber.close();
+
+ messageToSend = session.createTextMessage("Test1");
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ subscriber = session.createDurableSubscriber(topic, getTestQueueName());
+ receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+ }
+
+ public void testDurableSubscriptionnWithSelector() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName());
+ MessageProducer publisher = session.createProducer(topic);
+ MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false);
+
+ TextMessage messageToSend = session.createTextMessage("Test0");
+ messageToSend.setIntProperty("id", 1);
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ Message receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+ assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+
+ subscriber.close();
+
+ messageToSend = session.createTextMessage("Test1");
+ messageToSend.setIntProperty("id", 1);
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false);
+ receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText());
+ assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+ }
+
private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception
{
MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);