summaryrefslogtreecommitdiff
path: root/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-17 21:37:29 +0000
committerKeith Wall <kwall@apache.org>2014-06-17 21:37:29 +0000
commit710bcbc3552d7d7dc936708207bcab31748664b6 (patch)
treeddbc048d8f4ac23f3f48e4e14f71761f57dd2196 /java/broker-core/src
parentb9a033ffb01107cc26162c98dbe21987e0983fd8 (diff)
downloadqpid-python-710bcbc3552d7d7dc936708207bcab31748664b6.tar.gz
QPID-5785: [Java Broker] Fix NPE when a topic exchange is closed after a message filter has been added to an existing binding.
This corrects a regression introduced by QPID-5709 that was causing a NPE to appear in the test log for QueueBindingTest. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1603294 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java19
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java208
2 files changed, 185 insertions, 42 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 40a8a8f7ee..3e5d7dedb4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -76,8 +76,10 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
assert queue != null;
assert bindingKey != null;
- _logger.debug("Updating binding of queue " + queue.getName() + " with routing key " + bindingKey);
-
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Updating binding of queue " + queue.getName() + " with routing key " + bindingKey);
+ }
String routingKey = TopicNormalizer.normalize(bindingKey);
@@ -87,6 +89,7 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
if (_bindings.containsKey(binding))
{
Map<String, Object> oldArgs = _bindings.get(binding);
+ _bindings.put(binding, args);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if (FilterSupport.argumentsContainFilter(args))
@@ -136,8 +139,10 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
assert queue != null;
assert bindingKey != null;
- _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey);
-
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey);
+ }
String routingKey = TopicNormalizer.normalize(bindingKey);
@@ -252,6 +257,12 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
if(_bindings.containsKey(binding))
{
Map<String,Object> bindingArgs = _bindings.remove(binding);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("deregisterQueue " + bindingArgs);
+ }
+
String bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 5dc96a1ac9..d7779390b1 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.exchange;
+import static org.apache.qpid.common.AMQPFilterTypes.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,6 +33,7 @@ import java.util.UUID;
import org.junit.Assert;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -64,6 +67,7 @@ public class TopicExchangeTest extends QpidTestCase
attributes.put(Exchange.DURABLE, false);
_exchange = new TopicExchange(attributes, _vhost);
+ _exchange.open();
}
@Override
@@ -94,7 +98,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testNoRoute() throws Exception
{
AMQQueue<?> queue = createQueue("a*#b");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null);
routeMessage("a.b", 0l);
@@ -105,7 +109,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testDirectMatch() throws Exception
{
AMQQueue<?> queue = createQueue("ab");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null);
routeMessage("a.b",0l);
@@ -127,7 +131,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testStarMatch() throws Exception
{
AMQQueue<?> queue = createQueue("a*");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.*", queue, _exchange, null);
routeMessage("a.b",0l);
@@ -158,7 +162,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashMatch() throws Exception
{
AMQQueue<?> queue = createQueue("a#");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.#", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.#", queue, _exchange, null);
routeMessage("a.b.c",0l);
@@ -209,7 +213,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testMidHash() throws Exception
{
AMQQueue<?> queue = createQueue("a");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null);
routeMessage("a.c.d.b",0l);
@@ -234,7 +238,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testMatchAfterHash() throws Exception
{
AMQQueue<?> queue = createQueue("a#");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b.c", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.*.#.b.c", queue, _exchange, null);
int queueCount = routeMessage("a.c.b.b",0l);
@@ -272,11 +276,11 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashAfterHash() throws Exception
{
AMQQueue<?> queue = createQueue("a#");
- _exchange.registerQueue(createBinding(UUID.randomUUID(),
- "a.*.#.b.c.#.d",
- queue,
- _exchange,
- null));
+ createBinding(UUID.randomUUID(),
+ "a.*.#.b.c.#.d",
+ queue,
+ _exchange,
+ null);
int queueCount = routeMessage("a.c.b.b.c",0l);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
@@ -297,7 +301,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashHash() throws Exception
{
AMQQueue<?> queue = createQueue("a#");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.#.*.#.d", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.#.*.#.d", queue, _exchange, null);
int queueCount = routeMessage("a.c.b.b.c",0l);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
@@ -318,7 +322,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testSubMatchFails() throws Exception
{
AMQQueue<?> queue = createQueue("a");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b.c.d", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.b.c.d", queue, _exchange, null);
int queueCount = routeMessage("a.b.c",0l);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
@@ -327,27 +331,10 @@ public class TopicExchangeTest extends QpidTestCase
}
- private int routeMessage(String routingKey, long messageNumber)
- {
- ServerMessage message = mock(ServerMessage.class);
- when(message.getInitialRoutingAddress()).thenReturn(routingKey);
- List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY);
- MessageReference ref = mock(MessageReference.class);
- when(ref.getMessage()).thenReturn(message);
- when(message.newReference()).thenReturn(ref);
- when(message.getMessageNumber()).thenReturn(messageNumber);
- for(BaseQueue q : queues)
- {
- q.enqueue(message, null);
- }
-
- return queues.size();
- }
-
public void testMoreRouting() throws Exception
{
AMQQueue<?> queue = createQueue("a");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null);
int queueCount = routeMessage("a.b.c",0l);
@@ -360,7 +347,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testMoreQueue() throws Exception
{
AMQQueue<?> queue = createQueue("a");
- _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null));
+ createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null);
int queueCount = routeMessage("a",0l);
@@ -370,11 +357,119 @@ public class TopicExchangeTest extends QpidTestCase
}
- private static BindingImpl createBinding(UUID id,
- final String bindingKey,
- final AMQQueue queue,
- final ExchangeImpl exchange,
- final Map<String, Object> arguments)
+ public void testRouteWithJMSSelector() throws Exception
+ {
+ AMQQueue<?> queue = createQueue("queue1");
+ final String bindingKey = "bindingKey";
+
+ Map<String, Object> bindArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5");
+ createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, bindArgs);
+
+ ServerMessage matchMsg1 = mock(ServerMessage.class);
+ AMQMessageHeader msgHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
+ when(matchMsg1.getMessageHeader()).thenReturn(msgHeader1);
+ routeMessage(matchMsg1, bindingKey, 1);
+ Assert.assertEquals("First message should be routed to queue", 1, queue.getQueueDepthMessages());
+
+ ServerMessage nonmatchMsg2 = mock(ServerMessage.class);
+ AMQMessageHeader msgHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 5));
+ when(nonmatchMsg2.getMessageHeader()).thenReturn(msgHeader2);
+ routeMessage(nonmatchMsg2, bindingKey, 2);
+ Assert.assertEquals("Second message should not be routed to queue", 1, queue.getQueueDepthMessages());
+
+ ServerMessage nonmatchMsg3 = mock(ServerMessage.class);
+ AMQMessageHeader msgHeader3 = createMessageHeader(Collections.<String, Object>emptyMap());
+ when(nonmatchMsg3.getMessageHeader()).thenReturn(msgHeader3);
+ routeMessage(nonmatchMsg3, bindingKey, 3);
+ Assert.assertEquals("Third message should not be routed to queue", 1, queue.getQueueDepthMessages());
+
+ ServerMessage matchMsg4 = mock(ServerMessage.class);
+ AMQMessageHeader msgHeader4 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
+ when(matchMsg4.getMessageHeader()).thenReturn(msgHeader4);
+ routeMessage(matchMsg4, bindingKey, 4);
+ Assert.assertEquals("First message should be routed to queue", 2, queue.getQueueDepthMessages());
+
+ }
+
+ public void testUpdateBindingReplacingSelector() throws Exception
+ {
+ AMQQueue<?> queue = createQueue("queue1");
+ final String bindingKey = "a";
+
+ Map<String, Object> originalArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5");
+ createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, originalArgs);
+
+ AMQMessageHeader mgsHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
+ ServerMessage msg1 = mock(ServerMessage.class);
+ when(msg1.getMessageHeader()).thenReturn(mgsHeader1);
+
+ routeMessage(msg1, bindingKey, 1);
+ Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+ // Update the binding
+ Map<String, Object> newArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6");
+ _exchange.replaceBinding(bindingKey, queue, newArgs);
+
+ // Message that would have matched the original selector but not the new
+ AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
+ ServerMessage msg2 = mock(ServerMessage.class);
+ when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
+
+ routeMessage(msg2, bindingKey, 2);
+ Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+ // Message that matches only the second
+ AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
+ ServerMessage msg3 = mock(ServerMessage.class);
+ when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
+
+ routeMessage(msg3, bindingKey, 2);
+ Assert.assertEquals(2, queue.getQueueDepthMessages());
+
+ }
+
+ // This demonstrates QPID-5785. Deleting the exchange after this combination of binding
+ // updates generated a NPE
+ public void testUpdateBindingAddingSelector() throws Exception
+ {
+ AMQQueue<?> queue = createQueue("queue1");
+ final String bindingKey = "a";
+
+ BindingImpl binding = createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, null);
+
+ ServerMessage msg1 = mock(ServerMessage.class);
+
+ routeMessage(msg1, bindingKey, 1);
+ Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+ // Update the binding adding selector
+ Map<String, Object> newArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6");
+ _exchange.replaceBinding(bindingKey, queue, newArgs);
+
+ // Message that does not match the new selector
+ AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
+ ServerMessage msg2 = mock(ServerMessage.class);
+ when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
+
+ routeMessage(msg2, bindingKey, 2);
+ Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+ // Message that matches the selector
+ AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
+ ServerMessage msg3 = mock(ServerMessage.class);
+ when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
+
+ routeMessage(msg3, bindingKey, 2);
+ Assert.assertEquals(2, queue.getQueueDepthMessages());
+
+ _exchange.delete();
+ }
+
+ private BindingImpl createBinding(UUID id,
+ String bindingKey,
+ AMQQueue queue,
+ ExchangeImpl exchange,
+ Map<String, Object> arguments)
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Binding.NAME, bindingKey);
@@ -383,10 +478,47 @@ public class TopicExchangeTest extends QpidTestCase
attributes.put(Binding.ARGUMENTS, arguments);
}
attributes.put(Binding.ID, id);
- BindingImpl binding = new BindingImpl(attributes, queue, exchange);
+
+ BindingImpl binding = (BindingImpl) _vhost.getObjectFactory().create(Binding.class, attributes, queue, exchange);
binding.open();
return binding;
}
+ private int routeMessage(String routingKey, long messageNumber)
+ {
+ ServerMessage message = mock(ServerMessage.class);
+ return routeMessage(message, routingKey, messageNumber);
+ }
+
+ private int routeMessage(ServerMessage message, String routingKey, long messageNumber)
+ {
+ when(message.getInitialRoutingAddress()).thenReturn(routingKey);
+ List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY);
+ MessageReference ref = mock(MessageReference.class);
+ when(ref.getMessage()).thenReturn(message);
+ when(message.newReference()).thenReturn(ref);
+ when(message.getMessageNumber()).thenReturn(messageNumber);
+ for(BaseQueue q : queues)
+ {
+ q.enqueue(message, null);
+ }
+
+ return queues.size();
+ }
+
+ private AMQMessageHeader createMessageHeader(Map<String, Object> headers)
+ {
+ AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+ for(Map.Entry<String, Object> entry : headers.entrySet())
+ {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+
+ when(messageHeader.containsHeader(key)).thenReturn(true);
+ when(messageHeader.getHeader(key)).thenReturn(value);
+ }
+ return messageHeader;
+ }
+
}