summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-11-24 12:00:21 +0000
committerRobert Greig <rgreig@apache.org>2006-11-24 12:00:21 +0000
commit5a966454e848b2a869714d63a59d4cbccf1300df (patch)
tree77ca7be61e942dbe7f29646a96acdbaa5feeecd1
parentaf99c8e1644d7861bdfbbe4c84443fe55338697e (diff)
downloadqpid-python-5a966454e848b2a869714d63a59d4cbccf1300df.tar.gz
Merged systests from trunk rev 478842
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@478846 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/systests/src/test/java/log4j.properties28
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java183
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java215
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java184
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java78
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java296
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java266
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java313
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java257
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java50
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java175
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java54
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java124
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java49
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java258
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java174
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java102
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java144
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java55
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java87
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java102
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java67
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java42
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java295
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java66
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java79
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java57
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java107
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java52
-rw-r--r--java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java52
-rw-r--r--java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java216
31 files changed, 4227 insertions, 0 deletions
diff --git a/java/systests/src/test/java/log4j.properties b/java/systests/src/test/java/log4j.properties
new file mode 100644
index 0000000000..6d596d1d19
--- /dev/null
+++ b/java/systests/src/test/java/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
new file mode 100644
index 0000000000..1a15ca7561
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class TxAckTest extends TestCase
+{
+ private Scenario individual;
+ private Scenario multiple;
+ private Scenario combined;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ //ack only 5th msg
+ individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+ individual.update(5, false);
+
+ //ack all up to and including 5th msg
+ multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+ multiple.update(5, true);
+
+ //leave only 8th and 9th unacked
+ combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+ combined.update(3, false);
+ combined.update(5, true);
+ combined.update(7, true);
+ combined.update(2, true);//should be ignored
+ combined.update(1, false);//should be ignored
+ combined.update(10, false);
+ }
+
+ public void testPrepare() throws AMQException
+ {
+ individual.prepare();
+ multiple.prepare();
+ combined.prepare();
+ }
+
+ public void testUndoPrepare() throws AMQException
+ {
+ individual.undoPrepare();
+ multiple.undoPrepare();
+ combined.undoPrepare();
+ }
+
+ public void testCommit() throws AMQException
+ {
+ individual.commit();
+ multiple.commit();
+ combined.commit();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TxAckTest.class);
+ }
+
+ private class Scenario
+ {
+ private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>();
+ private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages);
+ private final TxAck _op = new TxAck(_map);
+ private final List<Long> _acked;
+ private final List<Long> _unacked;
+
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ {
+ for(int i = 0; i < messageCount; i++)
+ {
+ long deliveryTag = i + 1;
+ _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag));
+ }
+ _acked = acked;
+ _unacked = unacked;
+ }
+
+ void update(long deliverytag, boolean multiple)
+ {
+ _op.update(deliverytag, multiple);
+ }
+
+ private void assertCount(List<Long> tags, int expected)
+ {
+ for(long tag : tags)
+ {
+ UnacknowledgedMessage u = _messages.get(tag);
+ assertTrue("Message not found for tag " + tag, u != null);
+ ((TestMessage) u.message).assertCountEquals(expected);
+ }
+ }
+
+ void prepare() throws AMQException
+ {
+ _op.consolidate();
+ _op.prepare();
+
+ assertCount(_acked, -1);
+ assertCount(_unacked, 0);
+
+ }
+ void undoPrepare()
+ {
+ _op.consolidate();
+ _op.undoPrepare();
+
+ assertCount(_acked, 1);
+ assertCount(_unacked, 0);
+ }
+
+ void commit()
+ {
+ _op.consolidate();
+ _op.commit();
+
+
+ //check acked messages are removed from map
+ HashSet<Long> keys = new HashSet<Long>(_messages.keySet());
+ keys.retainAll(_acked);
+ assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+ //check unacked messages are still in map
+ keys = new HashSet<Long>(_unacked);
+ keys.removeAll(_messages.keySet());
+ assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+ }
+ }
+
+ private class TestMessage extends AMQMessage
+ {
+ private final long _tag;
+ private int _count;
+
+ TestMessage(long tag)
+ {
+ super(new TestableMemoryMessageStore(), null);
+ _tag = tag;
+ }
+
+ public void incrementReference()
+ {
+ _count++;
+ }
+
+ public void decrementReference()
+ {
+ _count--;
+ }
+
+ void assertCountEquals(int expected)
+ {
+ assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ }
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
new file mode 100644
index 0000000000..53ae097ea6
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+
+import junit.framework.TestCase;
+
+public class AbstractHeadersExchangeTest extends TestCase
+{
+ private final HeadersExchange exchange = new HeadersExchange();
+ protected final Set<TestQueue> queues = new HashSet<TestQueue>();
+ private int count;
+
+ protected TestQueue bindDefault(String... bindings) throws AMQException
+ {
+ return bind("Queue" + (++count), bindings);
+ }
+
+ protected TestQueue bind(String queueName, String... bindings) throws AMQException
+ {
+ return bind(queueName, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
+ {
+ return bind(new TestQueue(queue), bindings);
+ }
+
+ protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
+ {
+ return bind(queue, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
+ {
+ queues.add(queue);
+ exchange.registerQueue(null, queue, bindings);
+ return queue;
+ }
+
+
+ protected void route(Message m) throws AMQException
+ {
+ m.route(exchange);
+ }
+
+ protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
+ {
+ route(m);
+ for (TestQueue q : queues)
+ {
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
+ }
+ }
+
+ static FieldTable getHeaders(String... entries)
+ {
+ FieldTable headers = new FieldTable();
+ for (String s : entries)
+ {
+ String[] parts = s.split("=", 2);
+ headers.put(parts[0], parts.length > 1 ? parts[1] : "");
+ }
+ return headers;
+ }
+
+ static BasicPublishBody getPublishRequest(String id)
+ {
+ BasicPublishBody request = new BasicPublishBody();
+ request.routingKey = id;
+ return request;
+ }
+
+ static ContentHeaderBody getContentHeader(FieldTable headers)
+ {
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.properties = getProperties(headers);
+ return header;
+ }
+
+ static BasicContentHeaderProperties getProperties(FieldTable headers)
+ {
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ properties.setHeaders(headers);
+ return properties;
+ }
+
+ static class TestQueue extends AMQQueue
+ {
+ final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+
+ public TestQueue(String name) throws AMQException
+ {
+ super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry());
+ }
+
+ public void deliver(AMQMessage msg) throws AMQException
+ {
+ messages.add(new HeadersExchangeTest.Message(msg));
+ }
+ }
+
+ /**
+ * Just add some extra utility methods to AMQMessage to aid testing.
+ */
+ static class Message extends AMQMessage
+ {
+ private static MessageStore _messageStore = new SkeletonMessageStore();
+
+ Message(String id, String... headers) throws AMQException
+ {
+ this(id, getHeaders(headers));
+ }
+
+ Message(String id, FieldTable headers) throws AMQException
+ {
+ this(getPublishRequest(id), getContentHeader(headers), null);
+ }
+
+ private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ {
+ super(_messageStore, publish, header, bodies);
+ }
+
+ private Message(AMQMessage msg) throws AMQException
+ {
+ super(msg);
+ }
+
+ void route(Exchange exchange) throws AMQException
+ {
+ exchange.route(this);
+ }
+
+ boolean isInQueue(TestQueue queue)
+ {
+ return queue.messages.contains(this);
+ }
+
+ public int hashCode()
+ {
+ return getKey().hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ }
+
+ private boolean equals(HeadersExchangeTest.Message m)
+ {
+ return getKey().equals(m.getKey());
+ }
+
+ public String toString()
+ {
+ return getKey().toString();
+ }
+
+ private Object getKey()
+ {
+ return getPublishBody().routingKey;
+ }
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
new file mode 100644
index 0000000000..ff0d58ad69
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.util.List;
+
+/**
+ * Want to vary the number of regsitrations, messages and matches and measure
+ * the corresponding variance in execution time.
+ * <p/>
+ * Each registration will contain the 'All' header, even registrations will
+ * contain the 'Even' header and odd headers will contain the 'Odd' header.
+ * In additions each regsitration will have a unique value for the 'Specific'
+ * header as well.
+ * <p/>
+ * Messages can then be routed to all registrations, to even- or odd- registrations
+ * or to a specific registration.
+ *
+ */
+public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
+{
+ private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
+
+ private final TestQueue[] queues;
+ private final Mode mode;
+
+ public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
+ {
+ this.mode = mode;
+ queues = new TestQueue[registrations];
+ for (int i = 0; i < queues.length; i++)
+ {
+ switch(mode)
+ {
+ case ALL:
+ queues[i] = bind(new FastQueue("Queue" + i), "All");
+ break;
+ case ODD_OR_EVEN:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
+ break;
+ case SPECIFIC:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
+ break;
+ }
+ }
+ }
+
+ void sendToAll(int count) throws AMQException
+ {
+ send(count, "All=True");
+ }
+
+ void sendToOdd(int count) throws AMQException
+ {
+ send(count, "All=True", "Odd=True");
+ }
+
+ void sendToEven(int count) throws AMQException
+ {
+ send(count, "All=True", "Even=True");
+ }
+
+ void sendToAllSpecifically(int count) throws AMQException
+ {
+ for (int i = 0; i < queues.length; i++)
+ {
+ sendToSpecific(count, i);
+ }
+ }
+
+ void sendToSpecific(int count, int index) throws AMQException
+ {
+ send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
+ }
+
+ private void send(int count, String... headers) throws AMQException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ route(new Message("Message" + i, headers));
+ }
+ }
+
+ private static String oddOrEven(int i)
+ {
+ return (i % 2 == 0 ? "Even" : "Odd");
+ }
+
+ static class FastQueue extends TestQueue
+ {
+
+ public FastQueue(String name) throws AMQException
+ {
+ super(name);
+ }
+
+ public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
+ {
+ //just discard as we are not testing routing functionality here
+ }
+ }
+
+ static class Test extends TimedRun
+ {
+ private final Mode mode;
+ private final int registrations;
+ private final int count;
+ private HeadersExchangePerformanceTest test;
+
+ Test(Mode mode, int registrations, int count)
+ {
+ super(mode + ", registrations=" + registrations + ", count=" + count);
+ this.mode = mode;
+ this.registrations = registrations;
+ this.count = count;
+ }
+
+ protected void setup() throws Exception
+ {
+ test = new HeadersExchangePerformanceTest(mode, registrations);
+ run(100); //do a warm up run before times start
+ }
+
+ protected void teardown() throws Exception
+ {
+ test = null;
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ run(count);
+ }
+
+ private void run(int count) throws Exception
+ {
+ switch(mode)
+ {
+ case ALL:
+ test.sendToAll(count);
+ break;
+ default:
+ System.out.println("Test for " + mode + " not yet implemented.");
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ int registrations = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ TimedRun test = new Test(Mode.ALL, registrations, messages);
+ AveragedRun tests = new AveragedRun(test, iterations);
+ System.out.println(tests.call());
+ }
+}
+
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
new file mode 100644
index 0000000000..1c80e521ca
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+
+public class HeadersExchangeTest extends AbstractHeadersExchangeTest
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
+ }
+
+ public void testSimple() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ TestQueue q2 = bindDefault("F0000=Aardvark");
+ TestQueue q3 = bindDefault("F0001");
+ TestQueue q4 = bindDefault("F0001=Bear");
+ TestQueue q5 = bindDefault("F0000", "F0001");
+ TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
+ TestQueue q7 = bindDefault("F0000", "F0001=Bear");
+ TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
+ TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana");
+ TestQueue q10 = bindDefault("F0000=Apple", "F0001");
+
+ routeAndTest(new Message("Message1", "F0000"), q1);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+ q1, q2, q3, q4, q5, q6, q7, q8);
+ routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ public void testAny() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
+ TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
+ TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
+ TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
+ TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any");
+ TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
+
+ routeAndTest(new Message("Message1", "F0000"), q1, q3);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(HeadersExchangeTest.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
new file mode 100644
index 0000000000..81dea32a76
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
@@ -0,0 +1,296 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultCloseFuture;
+import org.apache.mina.common.support.DefaultWriteFuture;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+public class MockIoSession implements IoSession
+{
+ private AMQProtocolSession _protocolSession;
+
+ /**
+ * Stores the last response written
+ */
+ private Object _lastWrittenObject;
+
+ private boolean _closing;
+
+ public MockIoSession()
+ {
+ }
+
+ public Object getLastWrittenObject()
+ {
+ return _lastWrittenObject;
+ }
+
+ public IoService getService()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return null;
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public WriteFuture write(Object message)
+ {
+ WriteFuture wf = new DefaultWriteFuture(null);
+ _lastWrittenObject = message;
+ return wf;
+ }
+
+ public CloseFuture close()
+ {
+ _closing = true;
+ CloseFuture cf = new DefaultCloseFuture(null);
+ cf.setClosed();
+ return cf;
+ }
+
+ public Object getAttachment()
+ {
+ return _protocolSession;
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ Object current = _protocolSession;
+ _protocolSession = (AMQProtocolSession) attachment;
+ return current;
+ }
+
+ public Object getAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Set getAttributeKeys()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isConnected()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isClosing()
+ {
+ return _closing;
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getCreationTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIoTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastReadTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
new file mode 100644
index 0000000000..e76c164f64
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.AMQEncoder;
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * This test suite tests the handling of protocol initiation frames and related issues.
+ */
+public class TestProtocolInitiation extends TestCase implements ProtocolVersionList
+{
+ private AMQPFastProtocolHandler _protocolHandler;
+
+ private MockIoSession _mockIoSession;
+
+ /**
+ * We need to use the object encoder mechanism so to allow us to retrieve the
+ * output (a bytebuffer) we define our own encoder output class. The encoder
+ * writes the encoded data to this class, from where we can retrieve it during
+ * the test run.
+ */
+ private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
+ {
+ public ByteBuffer result;
+
+ public void write(ByteBuffer buf)
+ {
+ result = buf;
+ }
+
+ public void mergeAll()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public WriteFuture flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
+ {
+ public Object result;
+
+ public void write(Object buf)
+ {
+ result = buf;
+ }
+
+ public void flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _mockIoSession = new MockIoSession();
+ _protocolHandler = new AMQPFastProtocolHandler(null, null);
+ }
+
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol classes
+ * @throws Exception
+ */
+ public void testDecoderValidateProtocolClass() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolClass = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolClassException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolClassException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol instance numbers
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolInstance() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolInstance = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolInstanceException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolInstanceException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol major
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolMajor() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMajor = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolVersionException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolVersionException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol minor
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolMinor() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMinor = 99;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolVersionException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolVersionException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder accepts a valid PI
+ * @throws Exception
+ */
+ public void testDecoderValidatesHeader() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.header = new char[] {'P', 'Q', 'M', 'A' };
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolHeaderException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolHeaderException, got " + e);
+ }
+ }
+
+ /**
+ * Test that a valid header is passed by the decoder.
+ * @throws Exception
+ */
+ public void testDecoderAcceptsValidHeader() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ decodePI(pi);
+ }
+
+ /**
+ * This test checks that an invalid protocol header results in the
+ * connection being closed.
+ */
+ public void testInvalidProtocolHeaderClosesConnection() throws Exception
+ {
+ AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
+ _protocolHandler.exceptionCaught(_mockIoSession, pe);
+ assertNotNull(_mockIoSession.getLastWrittenObject());
+ Object piResponse = _mockIoSession.getLastWrittenObject();
+ assertEquals(piResponse.getClass(), ProtocolInitiation.class);
+ ProtocolInitiation pi = (ProtocolInitiation) piResponse;
+ assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
+ createValidProtocolInitiation());
+ assertTrue("Session has not been closed", _mockIoSession.isClosing());
+ }
+
+ private ProtocolInitiation createValidProtocolInitiation()
+ {
+ /* Find last protocol version in protocol version list. Make sure last protocol version
+ listed in the build file (build-module.xml) is the latest version which will be used
+ here. */
+ int i = pv.length - 1;
+ return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
+ }
+
+ /**
+ * Helper that encodes a protocol initiation and attempts to decode it
+ * @param pi
+ * @throws Exception
+ */
+ private void decodePI(ProtocolInitiation pi) throws Exception
+ {
+ // we need to do this test at the level of the decoder since we initially only expect PI frames
+ // so the protocol handler is not set up to know whether it should be expecting a PI frame or
+ // a different type of frame
+ AMQDecoder decoder = new AMQDecoder(true);
+ AMQEncoder encoder = new AMQEncoder();
+ TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
+ encoder.encode(_mockIoSession, pi, peo);
+ TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
+ decoder.decode(_mockIoSession, peo.result, pdo);
+ ((ProtocolInitiation) pdo.result).checkVersion(this);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestProtocolInitiation.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
new file mode 100644
index 0000000000..c2511f0a99
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -0,0 +1,313 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests that acknowledgements are handled correctly.
+ */
+public class AckTest extends TestCase
+{
+ private static final Logger _log = Logger.getLogger(AckTest.class);
+
+ private SubscriptionImpl _subscription;
+
+ private MockProtocolSession _protocolSession;
+
+ private TestableMemoryMessageStore _messageStore;
+
+ private AMQChannel _channel;
+
+ private SubscriptionSet _subscriptionManager;
+
+ private AMQQueue _queue;
+
+ public AckTest() throws Exception
+ {
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _messageStore = new TestableMemoryMessageStore();
+ _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
+ _protocolSession = new MockProtocolSession(_messageStore);
+ _protocolSession.addChannel(_channel);
+ _subscriptionManager = new SubscriptionSet();
+ _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager);
+ }
+
+ private void publishMessages(int count) throws AMQException
+ {
+ publishMessages(count, false);
+ }
+
+ private void publishMessages(int count, boolean persistent) throws AMQException
+ {
+ for (int i = 1; i <= count; i++)
+ {
+ BasicPublishBody publishBody = new BasicPublishBody();
+ publishBody.routingKey = "rk";
+ publishBody.exchange = "someExchange";
+ AMQMessage msg = new AMQMessage(_messageStore, publishBody);
+ if (persistent)
+ {
+ BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+ //This is DeliveryMode.PERSISTENT
+ b.setDeliveryMode((byte) 2);
+ ContentHeaderBody cb = new ContentHeaderBody();
+ cb.properties = b;
+ msg.setContentHeaderBody(cb);
+ }
+ else
+ {
+ msg.setContentHeaderBody(new ContentHeaderBody());
+ }
+ _subscription.send(msg, _queue);
+ }
+ }
+
+ /**
+ * Tests that the acknowledgements are correctly associated with a channel and
+ * order is preserved when acks are enabled
+ */
+ public void testAckChannelAssociationTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount, true);
+
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount);
+ assertTrue(_messageStore.getMessageMap().size() == msgCount);
+
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
+ for (int i = 1; i <= map.size(); i++)
+ {
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i);
+ UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ }
+
+ assertTrue(map.size() == msgCount);
+ assertTrue(_messageStore.getMessageMap().size() == msgCount);
+ }
+
+ /**
+ * Tests that in no-ack mode no messages are retained
+ */
+ public void testNoAckMode() throws AMQException
+ {
+ // false arg means no acks expected
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+ assertTrue(_messageStore.getMessageMap().size() == 0);
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ public void testSingleAckReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, false);
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount - 1);
+
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i);
+ UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ // 5 is the delivery tag of the message that *should* be removed
+ if (++i == 5)
+ {
+ ++i;
+ }
+ }
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ public void testMultiAckReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, true);
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i + 5);
+ UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ ++i;
+ }
+ }
+
+ /**
+ * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
+ */
+ public void testMultiAckAllReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(0, true);
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i + 5);
+ UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ ++i;
+ }
+ }
+
+ public void testPrefetchHighLow() throws AMQException
+ {
+ int lowMark = 5;
+ int highMark = 10;
+
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _channel.setPrefetchLowMarkCount(lowMark);
+ _channel.setPrefetchHighMarkCount(highMark);
+
+ assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
+ assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
+
+ publishMessages(highMark);
+
+ // at this point we should have sent out only highMark messages
+ // which have not bee received so will be queued up in the channel
+ // which should be suspended
+ assertTrue(_subscription.isSuspended());
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == highMark);
+
+ //acknowledge messages so we are just above lowMark
+ _channel.acknowledgeMessage(lowMark - 1, true);
+
+ //we should still be suspended
+ assertTrue(_subscription.isSuspended());
+ assertTrue(map.size() == lowMark + 1);
+
+ //acknowledge one more message
+ _channel.acknowledgeMessage(lowMark, true);
+
+ //and suspension should be lifted
+ assertTrue(!_subscription.isSuspended());
+
+ //pubilsh more msgs so we are just below the limit
+ publishMessages(lowMark - 1);
+
+ //we should not be suspended
+ assertTrue(!_subscription.isSuspended());
+
+ //acknowledge all messages
+ _channel.acknowledgeMessage(0, true);
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ //map will be empty
+ assertTrue(map.size() == 0);
+ }
+
+ public void testPrefetch() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _channel.setPrefetchCount(5);
+
+ assertTrue(_channel.getPrefetchCount() == 5);
+
+ final int msgCount = 5;
+ publishMessages(msgCount);
+
+ // at this point we should have sent out only 5 messages with a further 5 queued
+ // up in the channel which should now be suspended
+ assertTrue(_subscription.isSuspended());
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+ _channel.acknowledgeMessage(5, true);
+ assertTrue(!_subscription.isSuspended());
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ assertTrue(map.size() == 0);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(AckTest.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
new file mode 100644
index 0000000000..a76db6a728
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests delivery in the face of concurrent incoming _messages, subscription alterations
+ * and attempts to asynchronously process queued _messages.
+ */
+public class ConcurrencyTest extends MessageTestHelper
+{
+ private final Random random = new Random();
+
+ private final int numMessages = 1000;
+
+ private final List<TestSubscription> _subscribers = new ArrayList<TestSubscription>();
+ private final Set<Subscription> _active = new HashSet<Subscription>();
+ private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+ private int next = 0;//index to next message to send
+ private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+ private final Executor _executor = new OnCurrentThreadExecutor();
+ private final List<Thread> _threads = new ArrayList<Thread>();
+
+ private final SubscriptionSet _subscriptionMgr = new SubscriptionSet();
+ private final DeliveryManager _deliveryMgr;
+
+ private boolean isComplete;
+ private boolean failed;
+
+ public ConcurrencyTest() throws Exception
+ {
+ _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
+ new DefaultQueueRegistry()));
+ }
+
+ public void testConcurrent1() throws InterruptedException, AMQException
+ {
+ initSubscriptions(10);
+ initMessages(numMessages);
+ initThreads(1, 4, 4, 4);
+ doRun();
+ check();
+ }
+
+ public void testConcurrent2() throws InterruptedException, AMQException
+ {
+ initSubscriptions(10);
+ initMessages(numMessages);
+ initThreads(4, 2, 2, 2);
+ doRun();
+ check();
+ }
+
+ void check()
+ {
+ assertFalse("Failed", failed);
+
+ _deliveryMgr.processAsync(_executor);
+
+ assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size());
+ for(int i = 0; i < _messages.size(); i++)
+ {
+ assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i));
+ }
+ }
+
+ void initSubscriptions(int subscriptions)
+ {
+ for(int i = 0; i < subscriptions; i++)
+ {
+ _subscribers.add(new TestSubscription("Subscriber" + i, _received));
+ }
+ }
+
+ void initMessages(int messages) throws AMQException
+ {
+ for(int i = 0; i < messages; i++)
+ {
+ _messages.add(message());
+ }
+ }
+
+ void initThreads(int senders, int subscribers, int suspenders, int processors)
+ {
+ addThreads(senders, senders == 1 ? new Sender() : new OrderedSender());
+ addThreads(subscribers, new Subscriber());
+ addThreads(suspenders, new Suspender());
+ addThreads(processors, new Processor());
+ }
+
+ void addThreads(int count, Runnable runner)
+ {
+ for(int i = 0; i < count; i++)
+ {
+ _threads.add(new Thread(runner, runner.toString()));
+ }
+ }
+
+ void doRun() throws InterruptedException
+ {
+ for(Thread t : _threads)
+ {
+ t.start();
+ }
+
+ for(Thread t : _threads)
+ {
+ t.join();
+ }
+ }
+
+ private void toggle(Subscription s)
+ {
+ synchronized (_active)
+ {
+ if (_active.contains(s))
+ {
+ _active.remove(s);
+ Subscription result = _subscriptionMgr.removeSubscriber(s);
+ assertTrue("Removed subscription " + result + " but trying to remove subscription " + s,
+ result != null && result.equals(s));
+ }
+ else
+ {
+ _active.add(s);
+ _subscriptionMgr.addSubscriber(s);
+ }
+ }
+ }
+
+ private AMQMessage nextMessage()
+ {
+ synchronized (_messages)
+ {
+ if (next < _messages.size())
+ {
+ return _messages.get(next++);
+ }
+ else
+ {
+ if (!_deliveryMgr.hasQueuedMessages()) {
+ isComplete = true;
+ }
+ return null;
+ }
+ }
+ }
+
+ private boolean randomBoolean()
+ {
+ return random.nextBoolean();
+ }
+
+ private TestSubscription randomSubscriber()
+ {
+ return _subscribers.get(random.nextInt(_subscribers.size()));
+ }
+
+ private class Sender extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ AMQMessage msg = nextMessage();
+ if (msg != null)
+ {
+ _deliveryMgr.deliver(toString(), msg);
+ }
+ }
+ }
+
+ private class OrderedSender extends Sender
+ {
+ synchronized void doRun() throws Throwable
+ {
+ super.doRun();
+ }
+ }
+
+ private class Suspender extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ randomSubscriber().setSuspended(randomBoolean());
+ }
+ }
+
+ private class Subscriber extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ toggle(randomSubscriber());
+ }
+ }
+
+ private class Processor extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ _deliveryMgr.processAsync(_executor);
+ }
+ }
+
+ private abstract class Runner implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ while (!stop())
+ {
+ doRun();
+ }
+ }
+ catch (Throwable t)
+ {
+ failed = true;
+ t.printStackTrace();
+ }
+ }
+
+ abstract void doRun() throws Throwable;
+
+ boolean stop()
+ {
+ return isComplete || failed;
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConcurrencyTest.class);
+ }
+
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
new file mode 100644
index 0000000000..3072d44f48
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.DeliveryManagerTest;
+
+public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
+{
+ public ConcurrentDeliveryManagerTest() throws Exception
+ {
+ try
+ {
+ System.setProperty("concurrentdeliverymanager","true");
+ _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+ new DefaultQueueRegistry()));
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ throw new AMQException("Could not initialise delivery manager", t);
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConcurrentDeliveryManagerTest.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
new file mode 100644
index 0000000000..9cfd9458d1
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.AMQException;
+
+import junit.framework.TestSuite;
+
+abstract public class DeliveryManagerTest extends MessageTestHelper
+{
+ protected final SubscriptionSet _subscriptions = new SubscriptionSet();
+ protected DeliveryManager _mgr;
+
+ public DeliveryManagerTest() throws Exception
+ {
+ }
+
+ public void testStartInQueueingMode() throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[10];
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message();
+ }
+ int batch = messages.length / 2;
+
+ for (int i = 0; i < batch; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ TestSubscription s1 = new TestSubscription("1");
+ TestSubscription s2 = new TestSubscription("2");
+ _subscriptions.addSubscriber(s1);
+ _subscriptions.addSubscriber(s2);
+
+ for (int i = batch; i < messages.length; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ assertTrue(s1.getMessages().isEmpty());
+ assertTrue(s2.getMessages().isEmpty());
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+
+ assertEquals(messages.length / 2, s1.getMessages().size());
+ assertEquals(messages.length / 2, s2.getMessages().size());
+
+ for (int i = 0; i < messages.length; i++)
+ {
+ if (i % 2 == 0)
+ {
+ assertTrue(s1.getMessages().get(i / 2) == messages[i]);
+ }
+ else
+ {
+ assertTrue(s2.getMessages().get(i / 2) == messages[i]);
+ }
+ }
+ }
+
+ public void testStartInDirectMode() throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[10];
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message();
+ }
+ int batch = messages.length / 2;
+
+ TestSubscription s1 = new TestSubscription("1");
+ _subscriptions.addSubscriber(s1);
+
+ for (int i = 0; i < batch; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ assertEquals(batch, s1.getMessages().size());
+ for (int i = 0; i < batch; i++)
+ {
+ assertTrue(messages[i] == s1.getMessages().get(i));
+ }
+ s1.getMessages().clear();
+ assertEquals(0, s1.getMessages().size());
+
+ s1.setSuspended(true);
+ for (int i = batch; i < messages.length; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+ assertEquals(0, s1.getMessages().size());
+ s1.setSuspended(false);
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+ assertEquals(messages.length - batch, s1.getMessages().size());
+
+ for (int i = batch; i < messages.length; i++)
+ {
+ assertTrue(messages[i] == s1.getMessages().get(i - batch));
+ }
+
+ }
+
+ public void testNoConsumers() throws AMQException
+ {
+ try
+ {
+ AMQMessage msg = message(true);
+ _mgr.deliver("Me", msg);
+ msg.checkDeliveredToConsumer();
+ fail("expected exception did not occur");
+ }
+ catch (NoConsumersException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected NoConsumersException, got " + e);
+ }
+ }
+
+ public void testNoActiveConsumers() throws AMQException
+ {
+ try
+ {
+ TestSubscription s = new TestSubscription("A");
+ _subscriptions.addSubscriber(s);
+ s.setSuspended(true);
+ AMQMessage msg = message(true);
+ _mgr.deliver("Me", msg);
+ msg.checkDeliveredToConsumer();
+ fail("expected exception did not occur");
+ }
+ catch (NoConsumersException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected NoConsumersException, got " + e);
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ TestSuite suite = new TestSuite();
+ suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
+ suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
+ return suite;
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
new file mode 100644
index 0000000000..8570e6521f
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.AMQException;
+
+import junit.framework.TestCase;
+
+class MessageTestHelper extends TestCase
+{
+ private final MessageStore _messageStore = new SkeletonMessageStore();
+
+ MessageTestHelper() throws Exception
+ {
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
+ }
+
+ AMQMessage message() throws AMQException
+ {
+ return message(false);
+ }
+
+ AMQMessage message(boolean immediate) throws AMQException
+ {
+ BasicPublishBody publish = new BasicPublishBody();
+ publish.immediate = immediate;
+ return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null);
+ }
+
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
new file mode 100644
index 0000000000..bb8fd5bc19
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+
+import javax.security.sasl.SaslServer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A protocol session that can be used for testing purposes.
+ */
+public class MockProtocolSession implements AMQProtocolSession
+{
+ private MessageStore _messageStore;
+
+ private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+
+ public MockProtocolSession(MessageStore messageStore)
+ {
+ _messageStore = messageStore;
+ }
+
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
+ {
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+ }
+
+ public String getContextKey()
+ {
+ return null;
+ }
+
+ public void setContextKey(String contextKey)
+ {
+ }
+
+ public AMQChannel getChannel(int channelId)
+ {
+ AMQChannel channel = _channelMap.get(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Invalid channel id: " + channelId);
+ }
+ else
+ {
+ return channel;
+ }
+ }
+
+ public void addChannel(AMQChannel channel)
+ {
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Channel must not be null");
+ }
+ else
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+ }
+ }
+
+ public void closeChannel(int channelId) throws AMQException
+ {
+ }
+
+ public void removeChannel(int channelId)
+ {
+ _channelMap.remove(channelId);
+ }
+
+ public void initHeartbeats(int delay)
+ {
+ }
+
+ public void closeSession() throws AMQException
+ {
+ }
+
+ public Object getKey()
+ {
+ return null;
+ }
+
+ public String getLocalFQDN()
+ {
+ return null;
+ }
+
+ public SaslServer getSaslServer()
+ {
+ return null;
+ }
+
+ public void setSaslServer(SaslServer saslServer)
+ {
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
new file mode 100644
index 0000000000..11c0026455
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.ConcurrentTest;
+
+public class QueueConcurrentPerfTest extends QueuePerfTest
+{
+ QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory, queueCount, messages);
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
+ }
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java
new file mode 100644
index 0000000000..5b3857396d
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.RunStats;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class QueuePerfTest extends TimedRun
+{
+ private final Factory _factory;
+ private final int _queueCount;
+ private final int _messages;
+ private final String _msg = "";
+ private List<Queue<String>> _queues;
+
+ QueuePerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory + ", " + queueCount + ", " + messages);
+ _factory = factory;
+ _queueCount = queueCount;
+ _messages = messages;
+ }
+
+ protected void setup() throws Exception
+ {
+ //init
+ int count = Integer.getInteger("prepopulate", 0);
+// System.err.println("Prepopulating with " + count + " items");
+ _queues = new ArrayList<Queue<String>>(_queueCount);
+ for (int i = 0; i < _queueCount; i++)
+ {
+ Queue<String> q = _factory.create();
+ for(int j = 0; j < count; ++j)
+ {
+ q.add("Item"+ j);
+ }
+ _queues.add(q);
+ }
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ //dispatch
+ for (int i = 0; i < _messages; i++)
+ {
+ for (Queue<String> q : _queues)
+ {
+ q.offer(_msg);
+ q.poll();
+ }
+ }
+ }
+
+ static interface Factory
+ {
+ Queue<String> create();
+ }
+
+ static Factory CONCURRENT = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new ConcurrentLinkedQueue<String>();
+ }
+
+ public String toString()
+ {
+ return "ConcurrentLinkedQueue";
+ }
+
+ };
+
+ static Factory SYNCHRONIZED = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new SynchronizedQueue<String>(new LinkedList<String>());
+ }
+
+
+ public String toString()
+ {
+ return "Synchronized LinkedList";
+ }
+ };
+
+ static Factory PLAIN = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new LinkedList<String>();
+ }
+
+ public String toString()
+ {
+ return "Plain LinkedList";
+ }
+ };
+
+ static class SynchronizedQueue<E> implements Queue<E>
+ {
+ private final Queue<E> queue;
+
+ SynchronizedQueue(Queue<E> queue)
+ {
+ this.queue = queue;
+ }
+
+ public synchronized E element()
+ {
+ return queue.element();
+ }
+
+ public synchronized boolean offer(E o)
+ {
+ return queue.offer(o);
+ }
+
+ public synchronized E peek()
+ {
+ return queue.peek();
+ }
+
+ public synchronized E poll()
+ {
+ return queue.poll();
+ }
+
+ public synchronized E remove()
+ {
+ return queue.remove();
+ }
+
+ public synchronized int size()
+ {
+ return queue.size();
+ }
+
+ public synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ public synchronized boolean contains(Object o)
+ {
+ return queue.contains(o);
+ }
+
+ public synchronized Iterator<E> iterator()
+ {
+ return queue.iterator();
+ }
+
+ public synchronized Object[] toArray()
+ {
+ return queue.toArray();
+ }
+
+ public synchronized <T>T[] toArray(T[] a)
+ {
+ return queue.toArray(a);
+ }
+
+ public synchronized boolean add(E o)
+ {
+ return queue.add(o);
+ }
+
+ public synchronized boolean remove(Object o)
+ {
+ return queue.remove(o);
+ }
+
+ public synchronized boolean containsAll(Collection<?> c)
+ {
+ return queue.containsAll(c);
+ }
+
+ public synchronized boolean addAll(Collection<? extends E> c)
+ {
+ return queue.addAll(c);
+ }
+
+ public synchronized boolean removeAll(Collection<?> c)
+ {
+ return queue.removeAll(c);
+ }
+
+ public synchronized boolean retainAll(Collection<?> c)
+ {
+ return queue.retainAll(c);
+ }
+
+ public synchronized void clear()
+ {
+ queue.clear();
+ }
+ }
+
+ static void run(String label, AveragedRun test) throws Exception
+ {
+ RunStats stats = test.call();
+ System.out.println((label == null ? "" : label + ", ") + test
+ + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin());
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations));
+ }
+ }
+
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
new file mode 100644
index 0000000000..6490b9f270
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.MockIoSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.TimedRun;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SendPerfTest extends TimedRun
+{
+ private int _messages = 1000;
+ private int _clients = 10;
+ private List<AMQQueue> _queues;
+
+ public SendPerfTest(int clients, int messages)
+ {
+ super("SendPerfTest, msgs=" + messages + ", clients=" + clients);
+ _messages = messages;
+ _clients = clients;
+ }
+
+ protected void setup() throws Exception
+ {
+ _queues = initQueues(_clients);
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ deliver(_messages, _queues);
+ }
+
+ //have a dummy AMQProtocolSession that does nothing on the writeFrame()
+ //set up x number of queues
+ //create necessary bits and pieces to deliver a message
+ //deliver y messages to each queue
+
+ public static void main(String[] argv) throws Exception
+ {
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
+ int clients = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations);
+ test.run();
+ }
+
+ /**
+ * Delivers messages to a number of queues.
+ * @param count the number of messages to deliver
+ * @param queues the list of queues
+ * @throws NoConsumersException
+ */
+ static void deliver(int count, List<AMQQueue> queues) throws AMQException
+ {
+ BasicPublishBody publish = new BasicPublishBody();
+ publish.exchange = new NullExchange().getName();
+ ContentHeaderBody header = new ContentHeaderBody();
+ List<ContentBody> body = new ArrayList<ContentBody>();
+ MessageStore messageStore = new SkeletonMessageStore();
+ body.add(new ContentBody());
+ for (int i = 0; i < count; i++)
+ {
+ for (AMQQueue q : queues)
+ {
+ q.deliver(new AMQMessage(messageStore, i, publish, header, body));
+ }
+ }
+ }
+
+ static List<AMQQueue> initQueues(int number) throws AMQException
+ {
+ Exchange exchange = new NullExchange();
+ List<AMQQueue> queues = new ArrayList<AMQQueue>(number);
+ for (int i = 0; i < number; i++)
+ {
+ AMQQueue q = createQueue("Queue" + (i + 1));
+ q.bind("routingKey", exchange);
+ try
+ {
+ q.registerProtocolSession(createSession(), 1, "1", false);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Error creating protocol session: " + e, e);
+ }
+ queues.add(q);
+ }
+ return queues;
+ }
+
+ static AMQQueue createQueue(String name) throws AMQException
+ {
+ return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
+ new OnCurrentThreadExecutor());
+ }
+
+ static AMQProtocolSession createSession() throws Exception
+ {
+ IApplicationRegistry reg = ApplicationRegistry.getInstance();
+ AMQCodecFactory codecFactory = new AMQCodecFactory(true);
+ AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory);
+ result.addChannel(new AMQChannel(1, null, null));
+ return result;
+ }
+
+ static class NullExchange extends AbstractExchange
+ {
+ public String getName()
+ {
+ return "NullExchange";
+ }
+
+ protected ExchangeMBean createMBean()
+ {
+ return null;
+ }
+
+ public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void route(AMQMessage payload) throws AMQException
+ {
+ }
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
new file mode 100644
index 0000000000..f8db90850f
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class SubscriptionManagerTest extends TestCase
+{
+ private final SubscriptionSet mgr = new SubscriptionSet();
+
+ public void testBasicSubscriptionManagement()
+ {
+ assertTrue(mgr.isEmpty());
+ assertFalse(mgr.hasActiveSubscribers());
+ TestSubscription s1 = new TestSubscription("S1");
+ mgr.addSubscriber(s1);
+ assertFalse(mgr.isEmpty());
+ assertTrue(mgr.hasActiveSubscribers());
+
+ TestSubscription s2 = new TestSubscription("S2");
+ mgr.addSubscriber(s2);
+
+ s2.setSuspended(true);
+ assertFalse(mgr.isEmpty());
+ assertTrue(mgr.hasActiveSubscribers());
+ assertTrue(s2.isSuspended());
+ assertFalse(s1.isSuspended());
+
+ s1.setSuspended(true);
+ assertFalse(mgr.hasActiveSubscribers());
+
+ mgr.removeSubscriber(new TestSubscription("S1"));
+ assertFalse(mgr.isEmpty());
+ mgr.removeSubscriber(new TestSubscription("S2"));
+ assertTrue(mgr.isEmpty());
+ }
+
+ public void testRoundRobin()
+ {
+ TestSubscription a = new TestSubscription("A");
+ TestSubscription b = new TestSubscription("B");
+ TestSubscription c = new TestSubscription("C");
+ TestSubscription d = new TestSubscription("D");
+ mgr.addSubscriber(a);
+ mgr.addSubscriber(b);
+ mgr.addSubscriber(c);
+ mgr.addSubscriber(d);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertEquals(a, mgr.nextSubscriber(null));
+ assertEquals(b, mgr.nextSubscriber(null));
+ assertEquals(c, mgr.nextSubscriber(null));
+ assertEquals(d, mgr.nextSubscriber(null));
+ }
+
+ c.setSuspended(true);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertEquals(a, mgr.nextSubscriber(null));
+ assertEquals(b, mgr.nextSubscriber(null));
+ assertEquals(d, mgr.nextSubscriber(null));
+ }
+
+ mgr.removeSubscriber(a);
+ d.setSuspended(true);
+ c.setSuspended(false);
+ Subscription e = new TestSubscription("D");
+ mgr.addSubscriber(e);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertEquals(b, mgr.nextSubscriber(null));
+ assertEquals(c, mgr.nextSubscriber(null));
+ assertEquals(e, mgr.nextSubscriber(null));
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(SubscriptionManagerTest.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
new file mode 100644
index 0000000000..4969b5589a
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class SubscriptionSetTest extends TestCase
+{
+ /**
+ * A SubscriptionSet that counts the number of items scanned.
+ */
+ static class TestSubscriptionSet extends SubscriptionSet
+ {
+ private int scanned = 0;
+
+ void resetScanned()
+ {
+ scanned = 0;
+ }
+
+ protected void subscriberScanned()
+ {
+ ++scanned;
+ }
+
+ int getScanned()
+ {
+ return scanned;
+ }
+ }
+
+ final TestSubscription sub1 = new TestSubscription("1");
+ final TestSubscription sub2 = new TestSubscription("2");
+ final TestSubscription sub3 = new TestSubscription("3");
+
+ final TestSubscription suspendedSub1 = new TestSubscription("sus1", true);
+ final TestSubscription suspendedSub2 = new TestSubscription("sus2", true);
+ final TestSubscription suspendedSub3 = new TestSubscription("sus3", true);
+
+ public void testNextMessage()
+ {
+ SubscriptionSet ss = new SubscriptionSet();
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(0, ss.getCurrentSubscriber());
+
+ ss.addSubscriber(sub1);
+ assertEquals(sub1, ss.nextSubscriber(null));
+ assertEquals(1, ss.getCurrentSubscriber());
+ assertEquals(sub1, ss.nextSubscriber(null));
+ assertEquals(1, ss.getCurrentSubscriber());
+
+ ss.addSubscriber(sub2);
+ ss.addSubscriber(sub3);
+
+ assertEquals(sub2, ss.nextSubscriber(null));
+ assertEquals(2, ss.getCurrentSubscriber());
+
+ assertEquals(sub3, ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+ }
+
+ public void testNextMessageWhenAllSuspended()
+ {
+ SubscriptionSet ss = createAllSuspendedSubscriptionSet();
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+ }
+
+ private TestSubscriptionSet createAllSuspendedSubscriptionSet()
+ {
+ TestSubscriptionSet ss = new TestSubscriptionSet();
+ ss.addSubscriber(suspendedSub1);
+ ss.addSubscriber(suspendedSub2);
+ ss.addSubscriber(suspendedSub3);
+ return ss;
+ }
+
+ public void testNextMessageAfterRemove()
+ {
+ SubscriptionSet ss = new SubscriptionSet();
+ ss.addSubscriber(suspendedSub1);
+ ss.addSubscriber(suspendedSub2);
+ ss.addSubscriber(sub3);
+ assertEquals(sub3, ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+
+ assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1));
+
+ assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here.
+ assertEquals(2, ss.getCurrentSubscriber());
+ }
+
+ public void testNextMessageOverScanning()
+ {
+ TestSubscriptionSet ss = new TestSubscriptionSet();
+ TestSubscription sub = new TestSubscription("test");
+ ss.addSubscriber(suspendedSub1);
+ ss.addSubscriber(sub);
+ ss.addSubscriber(suspendedSub3);
+ assertEquals(sub, ss.nextSubscriber(null));
+ assertEquals(2, ss.getCurrentSubscriber());
+ assertEquals(2, ss.getScanned());
+
+ ss.resetScanned();
+ sub.setSuspended(true);
+ assertNull(ss.nextSubscriber(null));
+ assertEquals(3, ss.getCurrentSubscriber());
+ // Current implementation overscans by one item here.
+ assertEquals(ss.size() + 1, ss.getScanned());
+ }
+
+ public void testNextMessageOverscanWorstCase() {
+ TestSubscriptionSet ss = createAllSuspendedSubscriptionSet();
+ ss.nextSubscriber(null);
+ // Scans the subscriptions twice.
+ assertEquals(ss.size() * 2, ss.getScanned());
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(SubscriptionSetTest.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
new file mode 100644
index 0000000000..b49166d1ce
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
+import org.apache.qpid.server.queue.DeliveryManagerTest;
+import org.apache.qpid.AMQException;
+
+import junit.framework.TestSuite;
+
+public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest
+{
+ public SynchronizedDeliveryManagerTest() throws Exception
+ {
+ try
+ {
+ System.setProperty("concurrentdeliverymanager","false");
+ _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+ new DefaultQueueRegistry()));
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ throw new AMQException("Could not initialise delivery manager", t);
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ TestSuite suite = new TestSuite();
+ suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
+ return suite;
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
new file mode 100644
index 0000000000..de6df4bc03
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestSubscription implements Subscription
+{
+ private final List<AMQMessage> messages;
+ private final Object key;
+ private boolean isSuspended;
+
+ public TestSubscription(Object key)
+ {
+ this(key, new ArrayList<AMQMessage>());
+ }
+
+ public TestSubscription(final Object key, final boolean isSuspended)
+ {
+ this(key);
+ setSuspended(isSuspended);
+ }
+
+ TestSubscription(Object key, List<AMQMessage> messages)
+ {
+ this.key = key;
+ this.messages = messages;
+ }
+
+ List<AMQMessage> getMessages()
+ {
+ return messages;
+ }
+
+ public void send(AMQMessage msg, AMQQueue queue)
+ {
+ messages.add(msg);
+ }
+
+ public void setSuspended(boolean suspended)
+ {
+ isSuspended = suspended;
+ }
+
+ public boolean isSuspended()
+ {
+ return isSuspended;
+ }
+
+ public void queueDeleted(AMQQueue queue)
+ {
+ }
+
+ public int hashCode()
+ {
+ return key.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key);
+ }
+
+ public String toString()
+ {
+ return key.toString();
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
new file mode 100644
index 0000000000..bc0a8a7d64
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A message store that does nothing. Designed to be used in tests that do not want to use any message store
+ * functionality.
+ */
+public class SkeletonMessageStore implements MessageStore
+{
+ private final AtomicLong _messageId = new AtomicLong(1);
+
+ public void configure(String base, Configuration config) throws Exception
+ {
+ }
+
+ public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void put(AMQMessage msg)
+ {
+ }
+
+ public void removeMessage(long messageId)
+ {
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void removeQueue(String name) throws AMQException
+ {
+ }
+
+ public void enqueueMessage(String name, long messageId) throws AMQException
+ {
+ }
+
+ public void dequeueMessage(String name, long messageId) throws AMQException
+ {
+ }
+
+ public void beginTran() throws AMQException
+ {
+ }
+
+ public boolean inTran()
+ {
+ return false;
+ }
+
+ public void commitTran() throws AMQException
+ {
+ }
+
+ public void abortTran() throws AMQException
+ {
+ }
+
+ public List<AMQQueue> createQueues() throws AMQException
+ {
+ return null;
+ }
+
+ public long getNewMessageId()
+ {
+ return _messageId.getAndIncrement();
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
new file mode 100644
index 0000000000..f162506fed
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests that reference counting works correctly with AMQMessage and the message store
+ */
+public class TestReferenceCounting extends TestCase
+{
+ private TestableMemoryMessageStore _store;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _store = new TestableMemoryMessageStore();
+ }
+
+ /**
+ * Check that when the reference count is decremented the message removes itself from the store
+ */
+ public void testMessageGetsRemoved() throws AMQException
+ {
+ AMQMessage message = new AMQMessage(_store, null);
+ _store.put(message);
+ assertTrue(_store.getMessageMap().size() == 1);
+ message.decrementReference();
+ assertTrue(_store.getMessageMap().size() == 0);
+ }
+
+ public void testMessageRemains() throws AMQException
+ {
+ AMQMessage message = new AMQMessage(_store, null);
+ _store.put(message);
+ assertTrue(_store.getMessageMap().size() == 1);
+ message.incrementReference();
+ message.decrementReference();
+ assertTrue(_store.getMessageMap().size() == 1);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestReferenceCounting.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
new file mode 100644
index 0000000000..be7687a22c
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Adds some extra methods to the memory message store for testing purposes.
+ */
+public class TestableMemoryMessageStore extends MemoryMessageStore
+{
+ public TestableMemoryMessageStore()
+ {
+ _messageMap = new ConcurrentHashMap<Long, AMQMessage>();
+ }
+
+ public ConcurrentMap<Long, AMQMessage> getMessageMap()
+ {
+ return _messageMap;
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
new file mode 100644
index 0000000000..ac5c60a931
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -0,0 +1,295 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
+import java.util.LinkedList;
+
+import junit.framework.TestCase;
+
+public class TxnBufferTest extends TestCase
+{
+ private final LinkedList<MockOp> ops = new LinkedList<MockOp>();
+
+ public void testCommit() throws AMQException
+ {
+ MockStore store = new MockStore();
+
+ TxnBuffer buffer = new TxnBuffer(store);
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ //check relative ordering
+ MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit();
+ buffer.enlist(op);
+ buffer.enlist(op);
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+
+ buffer.commit();
+
+ validateOps();
+ store.validate();
+ }
+
+ public void testRollback() throws AMQException
+ {
+ MockStore store = new MockStore();
+
+ TxnBuffer buffer = new TxnBuffer(store);
+ buffer.enlist(new MockOp().expectRollback());
+ buffer.enlist(new MockOp().expectRollback());
+ buffer.enlist(new MockOp().expectRollback());
+
+ buffer.rollback();
+
+ validateOps();
+ store.validate();
+ }
+
+ public void testCommitWithFailureDuringPrepare() throws AMQException
+ {
+ MockStore store = new MockStore();
+ store.expectBegin().expectAbort();
+
+ TxnBuffer buffer = new TxnBuffer(store);
+ buffer.containsPersistentChanges();
+ buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
+ buffer.enlist(new TxnTester(store));
+ buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
+ buffer.enlist(new FailedPrepare());
+ buffer.enlist(new MockOp());
+
+ buffer.commit();
+ validateOps();
+ store.validate();
+ }
+
+ public void testCommitWithPersistance() throws AMQException
+ {
+ MockStore store = new MockStore();
+ store.expectBegin().expectCommit();
+
+ TxnBuffer buffer = new TxnBuffer(store);
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ buffer.enlist(new TxnTester(store));
+ buffer.containsPersistentChanges();
+
+ buffer.commit();
+ validateOps();
+ store.validate();
+ }
+
+ private void validateOps()
+ {
+ for (MockOp op : ops)
+ {
+ op.validate();
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TxnBufferTest.class);
+ }
+
+ class MockOp implements TxnOp
+ {
+ final Object PREPARE = "PREPARE";
+ final Object COMMIT = "COMMIT";
+ final Object UNDO_PREPARE = "UNDO_PREPARE";
+ final Object ROLLBACK = "ROLLBACK";
+
+ private final LinkedList expected = new LinkedList();
+
+ MockOp()
+ {
+ ops.add(this);
+ }
+
+ public void prepare()
+ {
+ assertEquals(expected.removeLast(), PREPARE);
+ }
+
+ public void commit()
+ {
+ assertEquals(expected.removeLast(), COMMIT);
+ }
+
+ public void undoPrepare()
+ {
+ assertEquals(expected.removeLast(), UNDO_PREPARE);
+ }
+
+ public void rollback()
+ {
+ assertEquals(expected.removeLast(), ROLLBACK);
+ }
+
+ private MockOp expect(Object optype)
+ {
+ expected.addFirst(optype);
+ return this;
+ }
+
+ MockOp expectPrepare()
+ {
+ return expect(PREPARE);
+ }
+
+ MockOp expectCommit()
+ {
+ return expect(COMMIT);
+ }
+
+ MockOp expectUndoPrepare()
+ {
+ return expect(UNDO_PREPARE);
+ }
+
+ MockOp expectRollback()
+ {
+ return expect(ROLLBACK);
+ }
+
+ void validate()
+ {
+ assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
+ }
+
+ void clear()
+ {
+ expected.clear();
+ }
+ }
+
+ class MockStore extends TestableMemoryMessageStore
+ {
+ final Object BEGIN = "BEGIN";
+ final Object ABORT = "ABORT";
+ final Object COMMIT = "COMMIT";
+
+ private final LinkedList expected = new LinkedList();
+ private boolean inTran;
+
+ public void beginTran() throws AMQException
+ {
+ assertEquals(expected.removeLast(), BEGIN);
+ inTran = true;
+ }
+
+ public void commitTran() throws AMQException
+ {
+ assertEquals(expected.removeLast(), COMMIT);
+ inTran = false;
+ }
+
+ public void abortTran() throws AMQException
+ {
+ assertEquals(expected.removeLast(), ABORT);
+ inTran = false;
+ }
+
+ public boolean inTran()
+ {
+ return inTran;
+ }
+
+ private MockStore expect(Object optype)
+ {
+ expected.addFirst(optype);
+ return this;
+ }
+
+ MockStore expectBegin()
+ {
+ return expect(BEGIN);
+ }
+
+ MockStore expectCommit()
+ {
+ return expect(COMMIT);
+ }
+
+ MockStore expectAbort()
+ {
+ return expect(ABORT);
+ }
+
+ void clear()
+ {
+ expected.clear();
+ }
+
+ void validate()
+ {
+ assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
+ }
+ }
+
+ class NullOp implements TxnOp
+ {
+ public void prepare() throws AMQException
+ {
+ }
+ public void commit()
+ {
+ }
+ public void undoPrepare()
+ {
+ }
+ public void rollback()
+ {
+ }
+ }
+
+ class FailedPrepare extends NullOp
+ {
+ public void prepare() throws AMQException
+ {
+ throw new AMQException("Fail!");
+ }
+ }
+
+ class TxnTester extends NullOp
+ {
+ private final MessageStore store;
+
+ TxnTester(MessageStore store)
+ {
+ this.store = store;
+ }
+
+ public void prepare() throws AMQException
+ {
+ assertTrue("Expected prepare to be performed under txn", store.inTran());
+ }
+
+ public void commit()
+ {
+ assertTrue("Expected commit not to be performed under txn", !store.inTran());
+ }
+ }
+
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
new file mode 100644
index 0000000000..1d17985ab5
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.qpid.server.util.TimedRun;
+
+import java.util.concurrent.Callable;
+import java.util.Collection;
+
+public class AveragedRun implements Callable<RunStats>
+{
+ private final RunStats stats = new RunStats();
+ private final TimedRun test;
+ private final int iterations;
+
+ public AveragedRun(TimedRun test, int iterations)
+ {
+ this.test = test;
+ this.iterations = iterations;
+ }
+
+ public RunStats call() throws Exception
+ {
+ for (int i = 0; i < iterations; i++)
+ {
+ stats.record(test.call());
+ }
+ return stats;
+ }
+
+ public void run() throws Exception
+ {
+ System.out.println(test + ": " + call());
+ }
+
+ public String toString()
+ {
+ return test.toString();
+ }
+
+ static void run(Collection<AveragedRun> tests) throws Exception
+ {
+ for(AveragedRun test : tests)
+ {
+ test.run();
+ }
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java
new file mode 100644
index 0000000000..1ae8d3205d
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class ConcurrentTest extends TimedRun
+{
+ private final TimedRun _test;
+ private final Thread[] _threads;
+
+ public ConcurrentTest(TimedRun test, int threads)
+ {
+ super(test.toString());
+ _test = test;
+ _threads = new Thread[threads];
+ }
+
+ protected void setup() throws Exception
+ {
+ _test.setup();
+ for(int i = 0; i < _threads.length; i++)
+ {
+ _threads[i] = new Thread(new Runner());
+ }
+ }
+
+ protected void teardown() throws Exception
+ {
+ _test.teardown();
+ }
+
+ protected void run() throws Exception
+ {
+ for(Thread t : _threads)
+ {
+ t.start();
+ }
+ for(Thread t : _threads)
+ {
+ t.join();
+ }
+ }
+
+ private class Runner implements Runnable
+ {
+ private Exception error;
+
+ public void run()
+ {
+ try
+ {
+ _test.run();
+ }
+ catch(Exception e)
+ {
+ error = e;
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
new file mode 100644
index 0000000000..ec67fc68b3
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class RunStats
+{
+ private long min = Long.MAX_VALUE;
+ private long max;
+ private long total;
+ private int count;
+
+ public void record(long time)
+ {
+ max = Math.max(time, max);
+ min = Math.min(time, min);
+ total += time;
+ count++;
+ }
+
+ public long getMin()
+ {
+ return min;
+ }
+
+ public long getMax()
+ {
+ return max;
+ }
+
+ public long getAverage()
+ {
+ return total / count;
+ }
+
+ public String toString()
+ {
+ return "avg=" + getAverage() + ", min=" + min + ", max=" + max;
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
new file mode 100644
index 0000000000..f801daf27c
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.NullAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+
+import java.util.HashMap;
+
+public class TestApplicationRegistry extends ApplicationRegistry
+{
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private ManagedObjectRegistry _managedObjectRegistry;
+
+ private AuthenticationManager _authenticationManager;
+
+ private MessageStore _messageStore;
+
+ public TestApplicationRegistry()
+ {
+ super(new MapConfiguration(new HashMap()));
+ }
+
+ public void initialise() throws Exception
+ {
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+ _queueRegistry = new DefaultQueueRegistry();
+ _exchangeFactory = new DefaultExchangeFactory();
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _authenticationManager = new NullAuthenticationManager();
+ _messageStore = new TestableMemoryMessageStore();
+
+ _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+ }
+
+ public Configuration getConfiguration()
+ {
+ return _configuration;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return _managedObjectRegistry;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+}
+
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
new file mode 100644
index 0000000000..1291380311
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.Callable;
+
+public abstract class TimedRun implements Callable<Long>
+{
+ private final String description;
+
+ public TimedRun(String description)
+ {
+ this.description = description;
+ }
+
+ public Long call() throws Exception
+ {
+ setup();
+ long start = System.currentTimeMillis();
+ run();
+ long stop = System.currentTimeMillis();
+ teardown();
+ return stop - start;
+ }
+
+ public String toString()
+ {
+ return description;
+ }
+
+ protected void setup() throws Exception{}
+ protected void teardown() throws Exception{}
+ protected abstract void run() throws Exception;
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java b/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java
new file mode 100644
index 0000000000..e859fac4af
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+
+import org.apache.qpid.client.transport.TransportConnection;
+
+public class VMBrokerSetup extends TestSetup
+{
+ public VMBrokerSetup(Test t)
+ {
+ super(t);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (Exception e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
new file mode 100644
index 0000000000..a3e555aac9
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.test.VMBrokerSetup;
+
+import javax.jms.*;
+
+import junit.framework.TestCase;
+
+public class DisconnectAndRedeliverTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+ DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
+ * the channel is closed.
+ *
+ * @throws Exception
+ */
+ public void testDisconnectRedeliversMessages() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg2", tm.getText());
+
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg3", tm.getText());
+
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm.getText());
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+
+ con.close();
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting third consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting fourth consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receive(3000);
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+
+ _logger.info("Actually:" + store.getMessageMap().size());
+ // assertTrue(store.getMessageMap().size() == 0);
+ }
+
+ /**
+ * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
+ * requeued (due perhaps to the queue being deleted).
+ *
+ * @throws Exception
+ */
+ public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception
+ {
+
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ _logger.info("Actually:" + store.getMessageMap().size());
+ assertTrue(store.getMessageMap().size() == 0);
+ con.close();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(DisconnectAndRedeliverTest.class));
+ }
+}