diff options
Diffstat (limited to 'java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java')
-rw-r--r-- | java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java b/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java new file mode 100644 index 0000000000..1e9dd54d85 --- /dev/null +++ b/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java @@ -0,0 +1,232 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.*; + +public class AbstractHeadersExchangeTest +{ + private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTest.class); + + private final HeadersExchange exchange = new HeadersExchange(); + protected final Set<TestQueue> queues = new HashSet<TestQueue>(); + private int count; + + protected TestQueue bindDefault(String... bindings) throws AMQException + { + return bind("Queue" + (++count), bindings); + } + + protected TestQueue bind(String queueName, String... bindings) throws AMQException + { + return bind(queueName, getHeaders(bindings)); + } + + protected TestQueue bind(String queue, FieldTable bindings) throws AMQException + { + return bind(new TestQueue(queue), bindings); + } + + protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException + { + return bind(queue, getHeaders(bindings)); + } + + protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException + { + queues.add(queue); + exchange.registerQueue(null, queue, bindings); + return queue; + } + + + protected void route(Message m) throws AMQException + { + m.route(exchange); + } + + protected void routeAndTest(Message m, TestQueue... expected) throws AMQException + { + routeAndTest(m, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException + { + route(m); + for (TestQueue q : queues) + { + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } + } + } + + static FieldTable getHeaders(String... entries) + { + FieldTable headers = new FieldTable(); + for (String s : entries) + { + String[] parts = s.split("=", 2); + headers.put(parts[0], parts.length > 1 ? parts[1] : ""); + } + return headers; + } + + static BasicPublishBody getPublishRequest(String id) + { + BasicPublishBody request = new BasicPublishBody(); + request.routingKey = id; + return request; + } + + static ContentHeaderBody getContentHeader(FieldTable headers) + { + ContentHeaderBody header = new ContentHeaderBody(); + header.properties = getProperties(headers); + return header; + } + + static BasicContentHeaderProperties getProperties(FieldTable headers) + { + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setHeaders(headers); + return properties; + } + + static class TestQueue extends AMQQueue + { + final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + + public TestQueue(String name) throws AMQException + { + super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry()); + } + + public void deliver(AMQMessage msg) throws AMQException + { + messages.add(new HeadersExchangeTest.Message(msg)); + } + } + + /** + * Just add some extra utility methods to AMQMessage to aid testing. + */ + static class Message extends AMQMessage + { + private static MessageStore _messageStore = new SkeletonMessageStore(); + + Message(String id, String... headers) throws AMQException + { + this(id, getHeaders(headers)); + } + + Message(String id, FieldTable headers) throws AMQException + { + this(getPublishRequest(id), getContentHeader(headers), null); + } + + private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException + { + //super(_messageStore, publish, header, bodies); + super(null); + throw new AMQException("Fix this!!!!"); + } + + private Message(AMQMessage msg) throws AMQException + { + super(msg); + } + + void route(Exchange exchange) throws AMQException + { + exchange.route(this); + } + + boolean isInQueue(TestQueue queue) + { + return queue.messages.contains(this); + } + + public int hashCode() + { + try + { + return getKey().hashCode(); + } + catch (AMQException e) + { + _log.error("Error getting key: " + e, e); + return 0; + } + } + + public boolean equals(Object o) + { + return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); + } + + private boolean equals(HeadersExchangeTest.Message m) + { + try + { + return getKey().equals(m.getKey()); + } + catch (AMQException e) + { + _log.error("Error getting key: " + e, e); + return false; + } + } + + public String toString() + { + try + { + return getKey().toString(); + } + catch (AMQException e) + { + _log.error("Error getting key: " + e, e); + return null; + } + } + + private Object getKey() throws AMQException + { + return getPublishBody().routingKey; + } + } +} |