summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java')
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java194
1 files changed, 194 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
new file mode 100644
index 0000000000..7335d43b2e
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class FanoutExchangeTest extends TestCase
+{
+ private FanoutExchange _exchange;
+ private VirtualHost _virtualHost;
+
+ public void setUp() throws AMQException
+ {
+ CurrentActor.setDefault(mock(LogActor.class));
+
+ _exchange = new FanoutExchange();
+ _virtualHost = mock(VirtualHost.class);
+ SecurityManager securityManager = mock(SecurityManager.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
+ when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), anyString())).thenReturn(true);
+ when(securityManager.authoriseUnbind(any(Exchange.class), anyString(), any(AMQQueue.class))).thenReturn(true);
+
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, "test", false, false);
+ }
+
+ public void testIsBoundStringMapAMQQueueWhenQueueIsNull()
+ {
+ assertFalse("calling isBound(AMQShortString,FieldTable,AMQQueue) with null queue should return false",
+ _exchange.isBound((String) null, (Map) null, (AMQQueue) null));
+ }
+
+ public void testIsBoundStringAMQQueueWhenQueueIsNull()
+ {
+ assertFalse("calling isBound(AMQShortString,AMQQueue) with null queue should return false",
+ _exchange.isBound((String) null, (AMQQueue) null));
+ }
+
+ public void testIsBoundAMQQueueWhenQueueIsNull()
+ {
+ assertFalse("calling isBound(AMQQueue) with null queue should return false", _exchange.isBound((AMQQueue) null));
+ }
+
+ public void testIsBoundStringMapAMQQueue() throws AMQSecurityException, AMQInternalException
+ {
+ AMQQueue queue = bindQueue();
+ assertTrue("Should return true for a bound queue",
+ _exchange.isBound("matters", null, queue));
+ }
+
+ public void testIsBoundStringAMQQueue() throws AMQSecurityException, AMQInternalException
+ {
+ AMQQueue queue = bindQueue();
+ assertTrue("Should return true for a bound queue",
+ _exchange.isBound("matters", queue));
+ }
+
+ public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException
+ {
+ AMQQueue queue = bindQueue();
+ assertTrue("Should return true for a bound queue",
+ _exchange.isBound(queue));
+ }
+
+ private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException
+ {
+ AMQQueue queue = mockQueue();
+ _exchange.addBinding("matters", queue, null);
+ return queue;
+ }
+
+ private AMQQueue mockQueue()
+ {
+ AMQQueue queue = mock(AMQQueue.class);
+ when(queue.getVirtualHost()).thenReturn(_virtualHost);
+ return queue;
+ }
+
+ public void testRoutingWithSelectors() throws Exception
+ {
+ AMQQueue queue1 = mockQueue();
+ AMQQueue queue2 = mockQueue();
+
+ _exchange.addBinding("key",queue1, null);
+ _exchange.addBinding("key",queue2, null);
+
+
+ List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
+
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.removeBinding("key",queue2,null);
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ result = _exchange.route(mockMessage(false));
+
+ assertEquals("Expected message to be routed to queue1 only", 1, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
+
+
+ result = _exchange.route(mockMessage(false));
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ }
+
+ private InboundMessage mockMessage(boolean val)
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.containsHeader("select")).thenReturn(true);
+ when(header.getHeader("select")).thenReturn(val);
+ when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+ when(header.containsHeaders(anySet())).then(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Set names = (Set) invocation.getArguments()[0];
+ return names.size() == 1 && names.contains("select");
+
+ }
+ });
+ final InboundMessage inboundMessage = mock(InboundMessage.class);
+ when(inboundMessage.getMessageHeader()).thenReturn(header);
+ return inboundMessage;
+ }
+}