summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-02-29 23:47:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-02-29 23:47:22 +0000
commite7d54437156176ffac3b576749ec2e91bcd55abb (patch)
treeddfc9ae11c850a8591c0303d22fc160b7723dc98
parent3fb61f5354df0b77325e4fc88aa59213d3000a8e (diff)
downloadqpid-python-e7d54437156176ffac3b576749ec2e91bcd55abb.tar.gz
QPID-3605 : renamed method, corrected brace style for ifs, added tests (per Robbies review comments)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295341 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java41
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java65
2 files changed, 94 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 714953baec..27166e4384 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -127,11 +127,13 @@ public class TopicExchange extends AbstractExchange
{
if(argumentsContainFilter(oldArgs))
{
- result.replaceQueueFilter(queue,createSelectorFilter(oldArgs, queue), createSelectorFilter(args, queue));
+ result.replaceQueueFilter(queue,
+ createMessageFilter(oldArgs, queue),
+ createMessageFilter(args, queue));
}
else
{
- result.addFilteredQueue(queue,createSelectorFilter(args,queue));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
result.removeUnfilteredQueue(queue);
}
}
@@ -140,7 +142,7 @@ public class TopicExchange extends AbstractExchange
if(argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, createSelectorFilter(oldArgs, queue));
+ result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
}
else
{
@@ -161,7 +163,7 @@ public class TopicExchange extends AbstractExchange
result = new TopicExchangeResult();
if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args, queue));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
@@ -174,7 +176,7 @@ public class TopicExchange extends AbstractExchange
{
if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args, queue));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
@@ -188,7 +190,7 @@ public class TopicExchange extends AbstractExchange
}
- private MessageFilter createSelectorFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
+ private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
{
if(argumentsContainNoLocal(args))
{
@@ -381,7 +383,7 @@ public class TopicExchange extends AbstractExchange
{
try
{
- result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs, binding.getQueue()));
+ result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
@@ -478,8 +480,15 @@ public class TopicExchange extends AbstractExchange
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
NoLocalFilter that = (NoLocalFilter) o;
@@ -512,15 +521,25 @@ public class TopicExchange extends AbstractExchange
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
CompoundFilter that = (CompoundFilter) o;
if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+ {
return false;
+ }
if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+ {
return false;
+ }
return true;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
index d5cbaaa203..9dfd3c912a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
@@ -42,7 +42,7 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName();
protected static final int SEND_COUNT = 10;
- public void test() throws Exception
+ public void testNoLocalNotQueued() throws Exception
{
if(!isBrokerStorePersistent())
{
@@ -73,6 +73,8 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
assertEquals("Normal Subscriber Received no messages",
SEND_COUNT, received.size());
session.commit();
+
+ normalSubscriber.close();
connection.close();
//We didn't receive the messages on the durable queue for the no-local subscriber
@@ -94,6 +96,67 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
session2.commit();
assertEquals("No Local Subscriber Received messages", 0, received.size());
+ noLocalSubscriber2.close();
+
+
+ }
+
+
+ public void testNonNoLocalQueued() throws Exception
+ {
+ if(!isBrokerStorePersistent())
+ {
+ fail("This test requires a broker with a persistent store");
+ }
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+ TopicSubscriber noLocalSubscriber =
+ session.createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true);
+
+
+ sendMessage(session, topic, SEND_COUNT);
+
+ // Check messages can be received as expected.
+ connection.start();
+
+ List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
+ assertEquals("No Local Subscriber Received messages", 0, received.size());
+
+
+
+ session.commit();
+
+ Connection connection3 = getConnection();
+ Session session3 = connection3.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(session3, topic, SEND_COUNT);
+
+
+ connection.close();
+
+ //We didn't receive the messages on the durable queue for the no-local subscriber
+ //so they are still on the broker. Restart the broker, prompting their recovery.
+ restartBroker();
+
+ Connection connection2 = getConnection();
+ connection2.start();
+
+ Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+ TopicSubscriber noLocalSubscriber2 =
+ session2.createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",null, true);
+
+ // The NO-local subscriber should receive messages sent from connection3
+ received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+ session2.commit();
+ assertEquals("No Local Subscriber did not receive expected messages", SEND_COUNT, received.size());
+
+ noLocalSubscriber2.close();
+
+
}
protected List<Message> receiveMessage(MessageConsumer messageConsumer,