summaryrefslogtreecommitdiff
path: root/java/java/client/test/src/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/java/client/test/src/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java')
-rw-r--r--java/java/client/test/src/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java247
1 files changed, 247 insertions, 0 deletions
diff --git a/java/java/client/test/src/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/java/client/test/src/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
new file mode 100644
index 0000000000..1b2108b928
--- /dev/null
+++ b/java/java/client/test/src/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import javax.jms.*;
+
+/**
+ * This is a slow test.
+ */
+public class MultipleConnectionTest
+{
+ public static String _connectionString = "vm://:1";
+
+ private static class Receiver
+ {
+ private AMQConnection _connection;
+ private Session[] _sessions;
+ private MessageCounter[] _counters;
+
+ Receiver(String broker, AMQDestination dest, int sessions) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest, sessions);
+ }
+
+ Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception
+ {
+ _connection = connection;
+ _sessions = new AMQSession[sessions];
+ _counters = new MessageCounter[sessions];
+ for (int i = 0; i < sessions; i++)
+ {
+ _sessions[i] = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _counters[i] = new MessageCounter(_sessions[i].toString());
+ _sessions[i].createConsumer(dest).setMessageListener(_counters[i]);
+ }
+ _connection.start();
+ }
+
+ void close() throws JMSException
+ {
+ _connection.close();
+ }
+ }
+
+ private static class Publisher
+ {
+ private AMQConnection _connection;
+ private Session _session;
+ private MessageProducer _producer;
+
+ Publisher(String broker, AMQDestination dest) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest);
+ }
+
+ Publisher(AMQConnection connection, AMQDestination dest) throws Exception
+ {
+ _connection = connection;
+ _session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _producer = _session.createProducer(dest);
+ }
+
+ void send(String msg) throws JMSException
+ {
+ _producer.send(_session.createTextMessage(msg));
+ }
+
+ void close() throws JMSException
+ {
+ _connection.close();
+ }
+ }
+
+ private static class MessageCounter implements MessageListener
+ {
+ private final String _name;
+ private int _count;
+
+ MessageCounter(String name)
+ {
+ _name = name;
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ _count++;
+ notifyAll();
+ }
+
+ synchronized boolean waitUntil(int expected, long maxWait) throws InterruptedException
+ {
+ long start = System.currentTimeMillis();
+ long timeLeft = maxWait;
+ do
+ {
+ wait(timeLeft);
+ timeLeft = maxWait - timeSince(start);
+ }
+ while (expected > _count && timeLeft > 0);
+ return expected <= _count;
+ }
+
+ private long timeSince(long start)
+ {
+ return System.currentTimeMillis() - start;
+ }
+
+ public synchronized String toString()
+ {
+ return _name + ": " + _count;
+ }
+ }
+
+ private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException
+ {
+ for (int i = 0; i < receivers.length; i++)
+ {
+ waitForCompletion(expected, wait, receivers[i]._counters);
+ }
+ }
+
+ private static void waitForCompletion(int expected, long wait, MessageCounter[] counters) throws InterruptedException
+ {
+ for (int i = 0; i < counters.length; i++)
+ {
+ if (!counters[i].waitUntil(expected, wait))
+ {
+ throw new RuntimeException("Expected: " + expected + " got " + counters[i]);
+ }
+ ;
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String broker = argv.length > 0 ? argv[0] : "vm://:1";
+
+ int connections = 7;
+ int sessions = 2;
+
+ MultipleConnectionTest test = new MultipleConnectionTest();
+ test._connectionString = broker;
+ test.test();
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ String broker = _connectionString;
+ int messages = 10;
+
+ AMQTopic topic = new AMQTopic("amq.topic");
+
+ /*
+ Receiver[] receivers = new Receiver[connections];
+ for(int i = 0; i < receivers.length; i++)
+ {
+ receivers[i] = new Receiver(broker, topic, sessions);
+ }
+ */
+
+ Receiver[] receivers = new Receiver[]{
+ new Receiver(broker, topic, 2),
+ new Receiver(broker, topic, 14)
+ };
+
+ Publisher publisher = new Publisher(broker, topic);
+ for (int i = 0; i < messages; i++)
+ {
+ publisher.send("Message " + (i + 1));
+ }
+
+ try
+ {
+ waitForCompletion(messages, 5000, receivers);
+ System.out.println("All receivers received all expected messages");
+ }
+ finally
+ {
+ publisher.close();
+ for (int i = 0; i < receivers.length; i++)
+ {
+ receivers[i].close();
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void createVMBroker() throws Exception
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ Assert.fail("Unable to create broker: " + e);
+ }
+ }
+
+ @AfterClass
+ public static void stopVmBroker()
+ {
+ TransportConnection.killVMBroker(1);
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(MultipleConnectionTest.class);
+ }
+}