diff options
author | Aidan Skinner <aidan@apache.org> | 2008-08-19 10:03:07 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-08-19 10:03:07 +0000 |
commit | 4de98efcdae4926ac062397cabeeed59a35beddd (patch) | |
tree | 2d188d8e7744dd7bd7dc06ffc297dfaf9e0836d5 | |
parent | e4540065984e2a791a3869826e0c03d596fce7eb (diff) | |
download | qpid-python-4de98efcdae4926ac062397cabeeed59a35beddd.tar.gz |
QPID-1202: Rebind durable subscriptions if the arguments have changed
TopicExchange: take field arguments into account when determining if topic binding already exists when binding, but not for regular isBound().
DurableSubscriptionTest: add test case for QPID-1202
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687010 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java | 33 | ||||
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java | 60 |
2 files changed, 85 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index c18cc337fe..59a8339346 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -115,11 +115,13 @@ public class TopicExchange extends AbstractExchange { private final AMQShortString _bindingKey; private final AMQQueue _queue; + private final FieldTable _args; - public Binding(AMQShortString bindingKey, AMQQueue queue) + public Binding(AMQShortString bindingKey, AMQQueue queue, FieldTable args) { _bindingKey = bindingKey; _queue = queue; + _args = args; } public AMQShortString getBindingKey() @@ -134,7 +136,7 @@ public class TopicExchange extends AbstractExchange public int hashCode() { - return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 + _queue.hashCode(); + return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode(); } public boolean equals(Object o) @@ -382,7 +384,7 @@ public class TopicExchange extends AbstractExchange routingKey = rKey; } - Binding binding = new Binding(rKey, queue); + Binding binding = new Binding(rKey, queue, args); if(_bindings.containsKey(binding)) { @@ -544,14 +546,29 @@ public class TopicExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) { - return isBound(routingKey, queue); + Binding binding = new Binding(routingKey, queue, arguments); + if (arguments == null) + { + return _bindings.containsKey(binding); + } + else + { + FieldTable o = _bindings.get(binding); + if (o != null) + { + return o.equals(arguments); + } + else + { + return false; + } + + } } public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - Binding binding = new Binding(routingKey, queue); - - return _bindings.containsKey(binding); + return isBound(routingKey, null, queue); } public boolean isBound(AMQShortString routingKey) @@ -590,7 +607,7 @@ public class TopicExchange extends AbstractExchange assert queue != null; assert rKey != null; - Binding binding = new Binding(rKey, queue); + Binding binding = new Binding(rKey, queue, args); if (!_bindings.containsKey(binding)) diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index fe7b97a47d..03f380243e 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -348,6 +348,66 @@ public class DurableSubscriptionTest extends QpidTestCase session.unsubscribe("testDurableWithInvalidDestinationsub"); } + /** + * Tests QPID-1202 + * Creates a durable subscription with a selector, then changes that selector on resubscription + * @throws Exception + */ + public void testResubscribeWithChangedSelector() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelector"); + MessageProducer producer = session.createProducer(topic); + + // Create durable subscriber that matches A + TopicSubscriber subA = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelector", + "Match = True", false); + + // Send 1 matching message and 1 non-matching message + sendMatchingAndNonMatchingMessage(session, producer); + + Message rMsg = subA.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelector1", + ((TextMessage) rMsg).getText()); + + rMsg = subA.receive(250); + assertNull(rMsg); + + // Disconnect subscriber + subA.close(); + + // Reconnect with new selector that matches B + TopicSubscriber subB = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelector","Match = False", false); + + + // Check messages are recieved properly + sendMatchingAndNonMatchingMessage(session, producer); + rMsg = subB.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelector2", + ((TextMessage) rMsg).getText()); + + rMsg = subB.receive(250); + assertNull(rMsg); + } + + private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException + { + TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1"); + msg.setBooleanProperty("Match", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelector2"); + msg.setBooleanProperty("Match", false); + producer.send(msg); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DurableSubscriptionTest.class); |