summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-08-19 10:03:07 +0000
committerAidan Skinner <aidan@apache.org>2008-08-19 10:03:07 +0000
commit4de98efcdae4926ac062397cabeeed59a35beddd (patch)
tree2d188d8e7744dd7bd7dc06ffc297dfaf9e0836d5
parente4540065984e2a791a3869826e0c03d596fce7eb (diff)
downloadqpid-python-4de98efcdae4926ac062397cabeeed59a35beddd.tar.gz
QPID-1202: Rebind durable subscriptions if the arguments have changed
TopicExchange: take field arguments into account when determining if topic binding already exists when binding, but not for regular isBound(). DurableSubscriptionTest: add test case for QPID-1202 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687010 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java33
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java60
2 files changed, 85 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index c18cc337fe..59a8339346 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -115,11 +115,13 @@ public class TopicExchange extends AbstractExchange
{
private final AMQShortString _bindingKey;
private final AMQQueue _queue;
+ private final FieldTable _args;
- public Binding(AMQShortString bindingKey, AMQQueue queue)
+ public Binding(AMQShortString bindingKey, AMQQueue queue, FieldTable args)
{
_bindingKey = bindingKey;
_queue = queue;
+ _args = args;
}
public AMQShortString getBindingKey()
@@ -134,7 +136,7 @@ public class TopicExchange extends AbstractExchange
public int hashCode()
{
- return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 + _queue.hashCode();
+ return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode();
}
public boolean equals(Object o)
@@ -382,7 +384,7 @@ public class TopicExchange extends AbstractExchange
routingKey = rKey;
}
- Binding binding = new Binding(rKey, queue);
+ Binding binding = new Binding(rKey, queue, args);
if(_bindings.containsKey(binding))
{
@@ -544,14 +546,29 @@ public class TopicExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
{
- return isBound(routingKey, queue);
+ Binding binding = new Binding(routingKey, queue, arguments);
+ if (arguments == null)
+ {
+ return _bindings.containsKey(binding);
+ }
+ else
+ {
+ FieldTable o = _bindings.get(binding);
+ if (o != null)
+ {
+ return o.equals(arguments);
+ }
+ else
+ {
+ return false;
+ }
+
+ }
}
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- Binding binding = new Binding(routingKey, queue);
-
- return _bindings.containsKey(binding);
+ return isBound(routingKey, null, queue);
}
public boolean isBound(AMQShortString routingKey)
@@ -590,7 +607,7 @@ public class TopicExchange extends AbstractExchange
assert queue != null;
assert rKey != null;
- Binding binding = new Binding(rKey, queue);
+ Binding binding = new Binding(rKey, queue, args);
if (!_bindings.containsKey(binding))
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index fe7b97a47d..03f380243e 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -348,6 +348,66 @@ public class DurableSubscriptionTest extends QpidTestCase
session.unsubscribe("testDurableWithInvalidDestinationsub");
}
+ /**
+ * Tests QPID-1202
+ * Creates a durable subscription with a selector, then changes that selector on resubscription
+ * @throws Exception
+ */
+ public void testResubscribeWithChangedSelector() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelector");
+ MessageProducer producer = session.createProducer(topic);
+
+ // Create durable subscriber that matches A
+ TopicSubscriber subA = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelector",
+ "Match = True", false);
+
+ // Send 1 matching message and 1 non-matching message
+ sendMatchingAndNonMatchingMessage(session, producer);
+
+ Message rMsg = subA.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelector1",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subA.receive(250);
+ assertNull(rMsg);
+
+ // Disconnect subscriber
+ subA.close();
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subB = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelector","Match = False", false);
+
+
+ // Check messages are recieved properly
+ sendMatchingAndNonMatchingMessage(session, producer);
+ rMsg = subB.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelector2",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subB.receive(250);
+ assertNull(rMsg);
+ }
+
+ private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException
+ {
+ TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelector2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);