summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java')
-rw-r--r--M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java230
1 files changed, 0 insertions, 230 deletions
diff --git a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
deleted file mode 100644
index 658cf26135..0000000000
--- a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.basic;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class MultipleConnectionTest extends QpidTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MultipleConnectionTest.class);
-
- public static final String _defaultBroker = "vm://:1";
- public String _connectionString = _defaultBroker;
-
- private class Receiver
- {
- private AMQConnection _connection;
- private Session[] _sessions;
- private MessageCounter[] _counters;
-
- Receiver(String broker, AMQDestination dest, int sessions) throws Exception
- {
- this((AMQConnection) getConnection("guest", "guest"), dest, sessions);
- }
-
- Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception
- {
- _connection = connection;
- _sessions = new AMQSession[sessions];
- _counters = new MessageCounter[sessions];
- for (int i = 0; i < sessions; i++)
- {
- _sessions[i] = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- _counters[i] = new MessageCounter(_sessions[i].toString());
- _sessions[i].createConsumer(dest).setMessageListener(_counters[i]);
- }
-
- _connection.start();
- }
-
- void close() throws JMSException
- {
- _connection.close();
- }
- }
-
- private class Publisher
- {
- private AMQConnection _connection;
- private Session _session;
- private MessageProducer _producer;
-
- Publisher(String broker, AMQDestination dest) throws Exception
- {
- this((AMQConnection) getConnection("guest", "guest"), dest);
- }
-
- Publisher(AMQConnection connection, AMQDestination dest) throws Exception
- {
- _connection = connection;
- _session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- _producer = _session.createProducer(dest);
- }
-
- void send(String msg) throws JMSException
- {
- _producer.send(_session.createTextMessage(msg));
- }
-
- void close() throws JMSException
- {
- _connection.close();
- }
- }
-
- private static class MessageCounter implements MessageListener
- {
- private final String _name;
- private int _count;
-
- MessageCounter(String name)
- {
- _name = name;
- }
-
- public synchronized void onMessage(Message message)
- {
- _count++;
- notify();
- }
-
- synchronized boolean waitUntil(int expected, long maxWait) throws InterruptedException
- {
- long start = System.currentTimeMillis();
- while (expected > _count)
- {
- long timeLeft = maxWait - timeSince(start);
- if (timeLeft < 0)
- {
- break;
- }
-
- wait(timeLeft);
- }
-
- return expected <= _count;
- }
-
- private long timeSince(long start)
- {
- return System.currentTimeMillis() - start;
- }
-
- public synchronized String toString()
- {
- return _name + ": " + _count;
- }
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- TransportConnection.createVMBroker(1);
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- TransportConnection.killAllVMBrokers();
- }
-
- private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException
- {
- for (int i = 0; i < receivers.length; i++)
- {
- waitForCompletion(expected, wait, receivers[i]._counters);
- }
- }
-
- private static void waitForCompletion(int expected, long wait, MessageCounter[] counters) throws InterruptedException
- {
- for (int i = 0; i < counters.length; i++)
- {
- if (!counters[i].waitUntil(expected, wait))
- {
- throw new RuntimeException("Expected: " + expected + " got " + counters[i]);
- }
- }
- }
-
- private static String randomize(String in)
- {
- return in + System.currentTimeMillis();
- }
-
- public static void main(String[] argv) throws Exception
- {
- String broker = (argv.length > 0) ? argv[0] : _defaultBroker;
-
- MultipleConnectionTest test = new MultipleConnectionTest();
- test._connectionString = broker;
- test.test();
- }
-
- public void test() throws Exception
- {
- String broker = _connectionString;
- int messages = 10;
-
- AMQTopic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "amq.topic");
-
- Receiver[] receivers = new Receiver[] { new Receiver(broker, topic, 2), new Receiver(broker, topic, 14) };
-
- Publisher publisher = new Publisher(broker, topic);
- for (int i = 0; i < messages; i++)
- {
- publisher.send("Message " + (i + 1));
- }
-
- try
- {
- waitForCompletion(messages, 5000, receivers);
- _logger.info("All receivers received all expected messages");
- }
- finally
- {
- publisher.close();
- for (int i = 0; i < receivers.length; i++)
- {
- receivers[i].close();
- }
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(MultipleConnectionTest.class);
- }
-}