diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-29 23:47:22 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-29 23:47:22 +0000 |
commit | e7d54437156176ffac3b576749ec2e91bcd55abb (patch) | |
tree | ddfc9ae11c850a8591c0303d22fc160b7723dc98 | |
parent | 3fb61f5354df0b77325e4fc88aa59213d3000a8e (diff) | |
download | qpid-python-e7d54437156176ffac3b576749ec2e91bcd55abb.tar.gz |
QPID-3605 : renamed method, corrected brace style for ifs, added tests (per Robbies review comments)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295341 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 94 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 714953baec..27166e4384 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -127,11 +127,13 @@ public class TopicExchange extends AbstractExchange { if(argumentsContainFilter(oldArgs)) { - result.replaceQueueFilter(queue,createSelectorFilter(oldArgs, queue), createSelectorFilter(args, queue)); + result.replaceQueueFilter(queue, + createMessageFilter(oldArgs, queue), + createMessageFilter(args, queue)); } else { - result.addFilteredQueue(queue,createSelectorFilter(args,queue)); + result.addFilteredQueue(queue, createMessageFilter(args, queue)); result.removeUnfilteredQueue(queue); } } @@ -140,7 +142,7 @@ public class TopicExchange extends AbstractExchange if(argumentsContainFilter(oldArgs)) { result.addUnfilteredQueue(queue); - result.removeFilteredQueue(queue, createSelectorFilter(oldArgs, queue)); + result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue)); } else { @@ -161,7 +163,7 @@ public class TopicExchange extends AbstractExchange result = new TopicExchangeResult(); if(argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createSelectorFilter(args, queue)); + result.addFilteredQueue(queue, createMessageFilter(args, queue)); } else { @@ -174,7 +176,7 @@ public class TopicExchange extends AbstractExchange { if(argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createSelectorFilter(args, queue)); + result.addFilteredQueue(queue, createMessageFilter(args, queue)); } else { @@ -188,7 +190,7 @@ public class TopicExchange extends AbstractExchange } - private MessageFilter createSelectorFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException + private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException { if(argumentsContainNoLocal(args)) { @@ -381,7 +383,7 @@ public class TopicExchange extends AbstractExchange { try { - result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs, binding.getQueue())); + result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -478,8 +480,15 @@ public class TopicExchange extends AbstractExchange @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } NoLocalFilter that = (NoLocalFilter) o; @@ -512,15 +521,25 @@ public class TopicExchange extends AbstractExchange @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } CompoundFilter that = (CompoundFilter) o; if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) + { return false; + } if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) + { return false; + } return true; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java index d5cbaaa203..9dfd3c912a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java @@ -42,7 +42,7 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName(); protected static final int SEND_COUNT = 10; - public void test() throws Exception + public void testNoLocalNotQueued() throws Exception { if(!isBrokerStorePersistent()) { @@ -73,6 +73,8 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase assertEquals("Normal Subscriber Received no messages", SEND_COUNT, received.size()); session.commit(); + + normalSubscriber.close(); connection.close(); //We didn't receive the messages on the durable queue for the no-local subscriber @@ -94,6 +96,67 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase session2.commit(); assertEquals("No Local Subscriber Received messages", 0, received.size()); + noLocalSubscriber2.close(); + + + } + + + public void testNonNoLocalQueued() throws Exception + { + if(!isBrokerStorePersistent()) + { + fail("This test requires a broker with a persistent store"); + } + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME); + + TopicSubscriber noLocalSubscriber = + session.createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true); + + + sendMessage(session, topic, SEND_COUNT); + + // Check messages can be received as expected. + connection.start(); + + List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT); + assertEquals("No Local Subscriber Received messages", 0, received.size()); + + + + session.commit(); + + Connection connection3 = getConnection(); + Session session3 = connection3.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(session3, topic, SEND_COUNT); + + + connection.close(); + + //We didn't receive the messages on the durable queue for the no-local subscriber + //so they are still on the broker. Restart the broker, prompting their recovery. + restartBroker(); + + Connection connection2 = getConnection(); + connection2.start(); + + Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED); + Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME); + + TopicSubscriber noLocalSubscriber2 = + session2.createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",null, true); + + // The NO-local subscriber should receive messages sent from connection3 + received = receiveMessage(noLocalSubscriber2, SEND_COUNT); + session2.commit(); + assertEquals("No Local Subscriber did not receive expected messages", SEND_COUNT, received.size()); + + noLocalSubscriber2.close(); + + } protected List<Message> receiveMessage(MessageConsumer messageConsumer, |