/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * * */ package org.apache.qpid.server.exchange; import junit.framework.Assert; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; public class TopicExchangeTest extends InternalBrokerBaseCase { private TopicExchange _exchange; private VirtualHost _vhost; private MessageStore _store; private InternalTestProtocolSession _protocolSession; @Override public void setUp() throws Exception { super.setUp(); _exchange = new TopicExchange(); _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); _store = new MemoryMessageStore(); _protocolSession = new InternalTestProtocolSession(_vhost); } public void testNoRoute() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.b"); routeMessage(message); Assert.assertEquals(0, queue.getMessageCount()); } public void testDirectMatch() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("ab"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.b"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c"); int queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } public void testStarMatch() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*", queue,_exchange, null)); IncomingMessage message = createMessage("a.b"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c"); int queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a"); queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } public void testHashMatch() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.#", queue,_exchange, null)); IncomingMessage message = createMessage("a.b.c"); int queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.b"); queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c"); queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a"); queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("b"); queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } public void testMidHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.d.b"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c.b"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } public void testMatchafterHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b.c", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.b.b"); int queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.a.b.c"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.b.c.b"); queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.b.c.b.c"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } public void testHashAfterHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b.c.#.d", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.b.b.c"); int queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.a.b.c.d"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } public void testHashHash() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.#.*.#.d", queue,_exchange, null)); IncomingMessage message = createMessage("a.c.b.b.c"); int queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.a.b.c.d"); routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } public void testSubMatchFails() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b.c.d", queue,_exchange, null)); IncomingMessage message = createMessage("a.b.c"); int queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } private int routeMessage(final IncomingMessage message) throws AMQException { MessageMetaData mmd = message.headersReceived(System.currentTimeMillis()); message.setStoredMessage(_store.addMessage(mmd)); message.enqueue(_exchange.route(message)); AMQMessage msg = new AMQMessage(message.getStoredMessage()); for(BaseQueue q : message.getDestinationQueues()) { q.enqueue(msg); } return message.getDestinationQueues().size(); } public void testMoreRouting() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); IncomingMessage message = createMessage("a.b.c"); int queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } public void testMoreQueue() throws AMQException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); IncomingMessage message = createMessage("a"); int queueCount = routeMessage(message); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } private IncomingMessage createMessage(String s) throws AMQException { MessagePublishInfo info = new PublishInfo(new AMQShortString(s)); IncomingMessage message = new IncomingMessage(info); final ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); chb.setProperties(props); message.setContentHeaderBody(chb); return message; } class PublishInfo implements MessagePublishInfo { private AMQShortString _routingkey; PublishInfo(AMQShortString routingkey) { _routingkey = routingkey; } public AMQShortString getExchange() { return null; } public void setExchange(AMQShortString exchange) { } public boolean isImmediate() { return false; } public boolean isMandatory() { return true; } public AMQShortString getRoutingKey() { return _routingkey; } } }