diff options
author | Keith Wall <kwall@apache.org> | 2014-06-17 21:37:29 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-06-17 21:37:29 +0000 |
commit | 710bcbc3552d7d7dc936708207bcab31748664b6 (patch) | |
tree | ddbc048d8f4ac23f3f48e4e14f71761f57dd2196 /java/broker-core/src | |
parent | b9a033ffb01107cc26162c98dbe21987e0983fd8 (diff) | |
download | qpid-python-710bcbc3552d7d7dc936708207bcab31748664b6.tar.gz |
QPID-5785: [Java Broker] Fix NPE when a topic exchange is closed after a message filter has been added to an existing binding.
This corrects a regression introduced by QPID-5709 that was causing a NPE to appear in the test log for QueueBindingTest.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1603294 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java | 19 | ||||
-rw-r--r-- | java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java | 208 |
2 files changed, 185 insertions, 42 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 40a8a8f7ee..3e5d7dedb4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -76,8 +76,10 @@ public class TopicExchange extends AbstractExchange<TopicExchange> assert queue != null; assert bindingKey != null; - _logger.debug("Updating binding of queue " + queue.getName() + " with routing key " + bindingKey); - + if (_logger.isDebugEnabled()) + { + _logger.debug("Updating binding of queue " + queue.getName() + " with routing key " + bindingKey); + } String routingKey = TopicNormalizer.normalize(bindingKey); @@ -87,6 +89,7 @@ public class TopicExchange extends AbstractExchange<TopicExchange> if (_bindings.containsKey(binding)) { Map<String, Object> oldArgs = _bindings.get(binding); + _bindings.put(binding, args); TopicExchangeResult result = _topicExchangeResults.get(routingKey); if (FilterSupport.argumentsContainFilter(args)) @@ -136,8 +139,10 @@ public class TopicExchange extends AbstractExchange<TopicExchange> assert queue != null; assert bindingKey != null; - _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey); - + if (_logger.isDebugEnabled()) + { + _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey); + } String routingKey = TopicNormalizer.normalize(bindingKey); @@ -252,6 +257,12 @@ public class TopicExchange extends AbstractExchange<TopicExchange> if(_bindings.containsKey(binding)) { Map<String,Object> bindingArgs = _bindings.remove(binding); + + if (_logger.isDebugEnabled()) + { + _logger.debug("deregisterQueue " + bindingArgs); + } + String bindingKey = TopicNormalizer.normalize(binding.getBindingKey()); TopicExchangeResult result = _topicExchangeResults.get(bindingKey); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 5dc96a1ac9..d7779390b1 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -20,9 +20,11 @@ */ package org.apache.qpid.server.exchange; +import static org.apache.qpid.common.AMQPFilterTypes.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,6 +33,7 @@ import java.util.UUID; import org.junit.Assert; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -64,6 +67,7 @@ public class TopicExchangeTest extends QpidTestCase attributes.put(Exchange.DURABLE, false); _exchange = new TopicExchange(attributes, _vhost); + _exchange.open(); } @Override @@ -94,7 +98,7 @@ public class TopicExchangeTest extends QpidTestCase public void testNoRoute() throws Exception { AMQQueue<?> queue = createQueue("a*#b"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null); routeMessage("a.b", 0l); @@ -105,7 +109,7 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws Exception { AMQQueue<?> queue = createQueue("ab"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null); routeMessage("a.b",0l); @@ -127,7 +131,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws Exception { AMQQueue<?> queue = createQueue("a*"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.*", queue, _exchange, null); routeMessage("a.b",0l); @@ -158,7 +162,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws Exception { AMQQueue<?> queue = createQueue("a#"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.#", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.#", queue, _exchange, null); routeMessage("a.b.c",0l); @@ -209,7 +213,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws Exception { AMQQueue<?> queue = createQueue("a"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null); routeMessage("a.c.d.b",0l); @@ -234,7 +238,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchAfterHash() throws Exception { AMQQueue<?> queue = createQueue("a#"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b.c", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.*.#.b.c", queue, _exchange, null); int queueCount = routeMessage("a.c.b.b",0l); @@ -272,11 +276,11 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws Exception { AMQQueue<?> queue = createQueue("a#"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), - "a.*.#.b.c.#.d", - queue, - _exchange, - null)); + createBinding(UUID.randomUUID(), + "a.*.#.b.c.#.d", + queue, + _exchange, + null); int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); @@ -297,7 +301,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws Exception { AMQQueue<?> queue = createQueue("a#"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.#.*.#.d", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.#.*.#.d", queue, _exchange, null); int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); @@ -318,7 +322,7 @@ public class TopicExchangeTest extends QpidTestCase public void testSubMatchFails() throws Exception { AMQQueue<?> queue = createQueue("a"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b.c.d", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.b.c.d", queue, _exchange, null); int queueCount = routeMessage("a.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); @@ -327,27 +331,10 @@ public class TopicExchangeTest extends QpidTestCase } - private int routeMessage(String routingKey, long messageNumber) - { - ServerMessage message = mock(ServerMessage.class); - when(message.getInitialRoutingAddress()).thenReturn(routingKey); - List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY); - MessageReference ref = mock(MessageReference.class); - when(ref.getMessage()).thenReturn(message); - when(message.newReference()).thenReturn(ref); - when(message.getMessageNumber()).thenReturn(messageNumber); - for(BaseQueue q : queues) - { - q.enqueue(message, null); - } - - return queues.size(); - } - public void testMoreRouting() throws Exception { AMQQueue<?> queue = createQueue("a"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null); int queueCount = routeMessage("a.b.c",0l); @@ -360,7 +347,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreQueue() throws Exception { AMQQueue<?> queue = createQueue("a"); - _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null)); + createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null); int queueCount = routeMessage("a",0l); @@ -370,11 +357,119 @@ public class TopicExchangeTest extends QpidTestCase } - private static BindingImpl createBinding(UUID id, - final String bindingKey, - final AMQQueue queue, - final ExchangeImpl exchange, - final Map<String, Object> arguments) + public void testRouteWithJMSSelector() throws Exception + { + AMQQueue<?> queue = createQueue("queue1"); + final String bindingKey = "bindingKey"; + + Map<String, Object> bindArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5"); + createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, bindArgs); + + ServerMessage matchMsg1 = mock(ServerMessage.class); + AMQMessageHeader msgHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); + when(matchMsg1.getMessageHeader()).thenReturn(msgHeader1); + routeMessage(matchMsg1, bindingKey, 1); + Assert.assertEquals("First message should be routed to queue", 1, queue.getQueueDepthMessages()); + + ServerMessage nonmatchMsg2 = mock(ServerMessage.class); + AMQMessageHeader msgHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 5)); + when(nonmatchMsg2.getMessageHeader()).thenReturn(msgHeader2); + routeMessage(nonmatchMsg2, bindingKey, 2); + Assert.assertEquals("Second message should not be routed to queue", 1, queue.getQueueDepthMessages()); + + ServerMessage nonmatchMsg3 = mock(ServerMessage.class); + AMQMessageHeader msgHeader3 = createMessageHeader(Collections.<String, Object>emptyMap()); + when(nonmatchMsg3.getMessageHeader()).thenReturn(msgHeader3); + routeMessage(nonmatchMsg3, bindingKey, 3); + Assert.assertEquals("Third message should not be routed to queue", 1, queue.getQueueDepthMessages()); + + ServerMessage matchMsg4 = mock(ServerMessage.class); + AMQMessageHeader msgHeader4 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7)); + when(matchMsg4.getMessageHeader()).thenReturn(msgHeader4); + routeMessage(matchMsg4, bindingKey, 4); + Assert.assertEquals("First message should be routed to queue", 2, queue.getQueueDepthMessages()); + + } + + public void testUpdateBindingReplacingSelector() throws Exception + { + AMQQueue<?> queue = createQueue("queue1"); + final String bindingKey = "a"; + + Map<String, Object> originalArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5"); + createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, originalArgs); + + AMQMessageHeader mgsHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); + ServerMessage msg1 = mock(ServerMessage.class); + when(msg1.getMessageHeader()).thenReturn(mgsHeader1); + + routeMessage(msg1, bindingKey, 1); + Assert.assertEquals(1, queue.getQueueDepthMessages()); + + // Update the binding + Map<String, Object> newArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6"); + _exchange.replaceBinding(bindingKey, queue, newArgs); + + // Message that would have matched the original selector but not the new + AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); + ServerMessage msg2 = mock(ServerMessage.class); + when(msg2.getMessageHeader()).thenReturn(mgsHeader2); + + routeMessage(msg2, bindingKey, 2); + Assert.assertEquals(1, queue.getQueueDepthMessages()); + + // Message that matches only the second + AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7)); + ServerMessage msg3 = mock(ServerMessage.class); + when(msg3.getMessageHeader()).thenReturn(mgsHeader3); + + routeMessage(msg3, bindingKey, 2); + Assert.assertEquals(2, queue.getQueueDepthMessages()); + + } + + // This demonstrates QPID-5785. Deleting the exchange after this combination of binding + // updates generated a NPE + public void testUpdateBindingAddingSelector() throws Exception + { + AMQQueue<?> queue = createQueue("queue1"); + final String bindingKey = "a"; + + BindingImpl binding = createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, null); + + ServerMessage msg1 = mock(ServerMessage.class); + + routeMessage(msg1, bindingKey, 1); + Assert.assertEquals(1, queue.getQueueDepthMessages()); + + // Update the binding adding selector + Map<String, Object> newArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6"); + _exchange.replaceBinding(bindingKey, queue, newArgs); + + // Message that does not match the new selector + AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); + ServerMessage msg2 = mock(ServerMessage.class); + when(msg2.getMessageHeader()).thenReturn(mgsHeader2); + + routeMessage(msg2, bindingKey, 2); + Assert.assertEquals(1, queue.getQueueDepthMessages()); + + // Message that matches the selector + AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7)); + ServerMessage msg3 = mock(ServerMessage.class); + when(msg3.getMessageHeader()).thenReturn(mgsHeader3); + + routeMessage(msg3, bindingKey, 2); + Assert.assertEquals(2, queue.getQueueDepthMessages()); + + _exchange.delete(); + } + + private BindingImpl createBinding(UUID id, + String bindingKey, + AMQQueue queue, + ExchangeImpl exchange, + Map<String, Object> arguments) { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Binding.NAME, bindingKey); @@ -383,10 +478,47 @@ public class TopicExchangeTest extends QpidTestCase attributes.put(Binding.ARGUMENTS, arguments); } attributes.put(Binding.ID, id); - BindingImpl binding = new BindingImpl(attributes, queue, exchange); + + BindingImpl binding = (BindingImpl) _vhost.getObjectFactory().create(Binding.class, attributes, queue, exchange); binding.open(); return binding; } + private int routeMessage(String routingKey, long messageNumber) + { + ServerMessage message = mock(ServerMessage.class); + return routeMessage(message, routingKey, messageNumber); + } + + private int routeMessage(ServerMessage message, String routingKey, long messageNumber) + { + when(message.getInitialRoutingAddress()).thenReturn(routingKey); + List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY); + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(ref); + when(message.getMessageNumber()).thenReturn(messageNumber); + for(BaseQueue q : queues) + { + q.enqueue(message, null); + } + + return queues.size(); + } + + private AMQMessageHeader createMessageHeader(Map<String, Object> headers) + { + AMQMessageHeader messageHeader = mock(AMQMessageHeader.class); + for(Map.Entry<String, Object> entry : headers.entrySet()) + { + String key = entry.getKey(); + Object value = entry.getValue(); + + when(messageHeader.containsHeader(key)).thenReturn(true); + when(messageHeader.getHeader(key)).thenReturn(value); + } + return messageHeader; + } + } |