diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-05-15 16:19:01 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-05-15 16:19:01 +0000 |
commit | 0843e8e4506a4b9e05c56822e50c7fd670ab531f (patch) | |
tree | 52e61e7cf913fc9d5958e3f6f507abcfb3346bbf | |
parent | d8284e2231a492553b902ca41d704bdc94e94994 (diff) | |
download | qpid-python-0843e8e4506a4b9e05c56822e50c7fd670ab531f.tar.gz |
QPID-3 Topic Matching with tests
A simple naive approach. Similar to C++ to be included for M2.
More elaborate pre-evaluated version will have to wait.
Once benchmarks have been performed we can evaluate performance advantages if any of that approach.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@538240 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 787 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 386cfd2349..e9c5b0024c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.exchange; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.StringTokenizer; +import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -56,6 +58,10 @@ public class DestWildExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(DestWildExchange.class); private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); + private static final String TOPIC_SEPARATOR = "."; + private static final String AMQP_STAR = "*"; + private static final String AMQP_HASH = "#"; /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") @@ -78,7 +84,7 @@ public class DestWildExchange extends AbstractExchange AMQShortString key = entry.getKey(); List<String> queueList = new ArrayList<String>(); - List<AMQQueue> queues = entry.getValue(); + List<AMQQueue> queues = getMatchedQueues(key); for (AMQQueue q : queues) { queueList.add(q.getName().toString()); @@ -118,10 +124,13 @@ public class DestWildExchange extends AbstractExchange return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; } - public synchronized void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; - assert routingKey != null; + assert rKey != null; + + AMQShortString routingKey = normalize(rKey); + _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); @@ -142,15 +151,67 @@ public class DestWildExchange extends AbstractExchange } + private AMQShortString normalize(AMQShortString routingKey) + { + StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); + List<String> _subscription = new ArrayList<String>(); + + while (routingTokens.hasMoreTokens()) + { + _subscription.add(routingTokens.nextToken()); + } + + int size = _subscription.size(); + + for (int index = 0; index < size; index++) + { + //if there are more levels + if (index + 1 < size) + { + if (_subscription.get(index).equals(AMQP_HASH)) + { + if (_subscription.get(index + 1).equals(AMQP_HASH)) + { + // we don't need #.# delete this one + _subscription.remove(index); + size--; + //redo this normalisation + index--; + } + + if (_subscription.get(index + 1).equals(AMQP_STAR)) + { + // we don't want #.* swap to *.# + // remove it and put it in at index + 1 + _subscription.add(index + 1, _subscription.remove(index)); + } + } + }//if we have more levels + } + + StringBuilder sb = new StringBuilder(); + + for (String s : _subscription) + { + sb.append(s); + sb.append(TOPIC_SEPARATOR); + } + + sb.deleteCharAt(sb.length() - 1); + + return new AMQShortString(sb.toString()); + } + public void route(AMQMessage payload) throws AMQException { MessagePublishInfo info = payload.getMessagePublishInfo(); - final AMQShortString routingKey = info.getRoutingKey(); - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + final AMQShortString routingKey = normalize(info.getRoutingKey()); + + List<AMQQueue> queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag - if (queues == null) + if (queues == null || queues.size() == 0) { if (info.isMandatory()) { @@ -177,14 +238,14 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); return queues != null && queues.contains(queue); } public boolean isBound(AMQShortString routingKey) throws AMQException { - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); return queues != null && !queues.isEmpty(); } @@ -205,10 +266,12 @@ public class DestWildExchange extends AbstractExchange return !_routingKey2queues.isEmpty(); } - public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; - assert routingKey != null; + assert rKey != null; + + AMQShortString routingKey = normalize(rKey); List<AMQQueue> queues = _routingKey2queues.get(routingKey); if (queues == null) @@ -241,4 +304,110 @@ public class DestWildExchange extends AbstractExchange throw new AMQException("Exception occured in creating the topic exchenge mbean", ex); } } + + + private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) + { + List<AMQQueue> list = new LinkedList<AMQQueue>(); + StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); + + ArrayList<String> routingkeyList = new ArrayList<String>(); + + while (routingTokens.hasMoreTokens()) + { + String next = routingTokens.nextToken(); + if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH)) + { + continue; + } + + routingkeyList.add(next); + } + + for (AMQShortString queue : _routingKey2queues.keySet()) + { + StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR); + + ArrayList<String> queueList = new ArrayList<String>(); + + while (queTok.hasMoreTokens()) + { + queueList.add(queTok.nextToken()); + } + + + int depth = 0; + boolean matching = true; + boolean done = false; + int routingskip = 0; + int queueskip = 0; + + while (matching && !done) + { + if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip) + { + done = true; + + // if it was the routing key that ran out of digits + if (routingkeyList.size() == depth + routingskip) + { + if (queueList.size() > (depth + queueskip)) + { // a hash and it is the last entry + matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1; + } + } + else if (routingkeyList.size() > depth + routingskip) + { + // There is still more routing key to check + matching = false; + } + + + continue; + } + + // if the values on the two topics don't match + if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip))) + { + if (queueList.get(depth + queueskip).equals(AMQP_STAR)) + { + depth++; + + continue; + } + else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) + { + // Is this a # at the end + if (queueList.size() == depth + queueskip + 1) + { + done = true; + continue; + } + + // otherwise # in the middle + while (routingkeyList.size() > depth + routingskip) + { + if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) + { + queueskip++; + depth++; + break; + } + routingskip++; + } + continue; + } + matching = false; + } + depth++; + } + + if (matching) + { + list.addAll(_routingKey2queues.get(queue)); + } + } + + return list; + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java new file mode 100644 index 0000000000..0a3bc93763 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; + +import java.util.HashSet; +import java.util.List; +import java.util.LinkedList; + +public class DestWildExchangeTest extends TestCase +{ + + DestWildExchange _exchange; + + VirtualHost _vhost; + MessageStore _store; + StoreContext _context; + + + public void setUp() throws AMQException + { + _exchange = new DestWildExchange(); + _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); + _store = new MemoryMessageStore(); + _context = new StoreContext(); + } + + + public void testNoRoute() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); + + + MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b")); + + AMQMessage message = new AMQMessage(0L, info, null); + + try + { + _exchange.route(message); + fail("Message has no route and shouldn't be routed"); + } + catch (NoRouteException nre) + { + //normal + } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + public void testDirectMatch() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + + + AMQMessage message = createMessage("a.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("a.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has no route and should fail to be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + + public void testStarMatch() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*"), queue, null); + + + AMQMessage message = createMessage("a.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("a.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("a"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has no route and should fail to be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + public void testHashMatch() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.#"), queue, null); + + + AMQMessage message = createMessage("a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("a.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("a.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has no route and should fail to be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + + public void testMidHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); + + + AMQMessage message = createMessage("a.c.d.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.c.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testMatchafterHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null); + + + AMQMessage message = createMessage("a.c.b.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("a.a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.b.c.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.b.c.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + + public void testHashAfterHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null); + + + AMQMessage message = createMessage("a.c.b.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + + + message = createMessage("a.a.b.c.d"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testHashHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null); + + + AMQMessage message = createMessage("a.c.b.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.a.b.c.d"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testSubMatchFails() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null); + + + AMQMessage message = createMessage("a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testMoreRouting() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + + + AMQMessage message = createMessage("a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testMoreQueue() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + + + AMQMessage message = createMessage("a"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { + } + + Assert.assertEquals(0, queue.getMessageCount()); + + } + + private AMQMessage createMessage(String s) throws AMQException + { + MessagePublishInfo info = new PublishInfo(new AMQShortString(s)); + + TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + + AMQMessage message = new AMQMessage(0L, info, trancontext); + message.setContentHeaderBody(new ContentHeaderBody()); + + return message; + } + + + class PublishInfo implements MessagePublishInfo + { + AMQShortString _routingkey; + + PublishInfo(AMQShortString routingkey) + { + _routingkey = routingkey; + } + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return true; + } + + public AMQShortString getRoutingKey() + { + return _routingkey; + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index ca352b2fd7..e6e378c9f8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -251,7 +251,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex ;
}
- assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ assertEquals("Wrong number of messages bounced: ", 1, _bouncedMessageList.size());
Message m = _bouncedMessageList.get(0);
assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
|