diff options
Diffstat (limited to 'M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java')
-rw-r--r-- | M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java | 230 |
1 files changed, 0 insertions, 230 deletions
diff --git a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java deleted file mode 100644 index 658cf26135..0000000000 --- a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.basic; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.test.utils.QpidTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public class MultipleConnectionTest extends QpidTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(MultipleConnectionTest.class); - - public static final String _defaultBroker = "vm://:1"; - public String _connectionString = _defaultBroker; - - private class Receiver - { - private AMQConnection _connection; - private Session[] _sessions; - private MessageCounter[] _counters; - - Receiver(String broker, AMQDestination dest, int sessions) throws Exception - { - this((AMQConnection) getConnection("guest", "guest"), dest, sessions); - } - - Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception - { - _connection = connection; - _sessions = new AMQSession[sessions]; - _counters = new MessageCounter[sessions]; - for (int i = 0; i < sessions; i++) - { - _sessions[i] = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); - _counters[i] = new MessageCounter(_sessions[i].toString()); - _sessions[i].createConsumer(dest).setMessageListener(_counters[i]); - } - - _connection.start(); - } - - void close() throws JMSException - { - _connection.close(); - } - } - - private class Publisher - { - private AMQConnection _connection; - private Session _session; - private MessageProducer _producer; - - Publisher(String broker, AMQDestination dest) throws Exception - { - this((AMQConnection) getConnection("guest", "guest"), dest); - } - - Publisher(AMQConnection connection, AMQDestination dest) throws Exception - { - _connection = connection; - _session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); - _producer = _session.createProducer(dest); - } - - void send(String msg) throws JMSException - { - _producer.send(_session.createTextMessage(msg)); - } - - void close() throws JMSException - { - _connection.close(); - } - } - - private static class MessageCounter implements MessageListener - { - private final String _name; - private int _count; - - MessageCounter(String name) - { - _name = name; - } - - public synchronized void onMessage(Message message) - { - _count++; - notify(); - } - - synchronized boolean waitUntil(int expected, long maxWait) throws InterruptedException - { - long start = System.currentTimeMillis(); - while (expected > _count) - { - long timeLeft = maxWait - timeSince(start); - if (timeLeft < 0) - { - break; - } - - wait(timeLeft); - } - - return expected <= _count; - } - - private long timeSince(long start) - { - return System.currentTimeMillis() - start; - } - - public synchronized String toString() - { - return _name + ": " + _count; - } - } - - protected void setUp() throws Exception - { - super.setUp(); - TransportConnection.createVMBroker(1); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - TransportConnection.killAllVMBrokers(); - } - - private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException - { - for (int i = 0; i < receivers.length; i++) - { - waitForCompletion(expected, wait, receivers[i]._counters); - } - } - - private static void waitForCompletion(int expected, long wait, MessageCounter[] counters) throws InterruptedException - { - for (int i = 0; i < counters.length; i++) - { - if (!counters[i].waitUntil(expected, wait)) - { - throw new RuntimeException("Expected: " + expected + " got " + counters[i]); - } - } - } - - private static String randomize(String in) - { - return in + System.currentTimeMillis(); - } - - public static void main(String[] argv) throws Exception - { - String broker = (argv.length > 0) ? argv[0] : _defaultBroker; - - MultipleConnectionTest test = new MultipleConnectionTest(); - test._connectionString = broker; - test.test(); - } - - public void test() throws Exception - { - String broker = _connectionString; - int messages = 10; - - AMQTopic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "amq.topic"); - - Receiver[] receivers = new Receiver[] { new Receiver(broker, topic, 2), new Receiver(broker, topic, 14) }; - - Publisher publisher = new Publisher(broker, topic); - for (int i = 0; i < messages; i++) - { - publisher.send("Message " + (i + 1)); - } - - try - { - waitForCompletion(messages, 5000, receivers); - _logger.info("All receivers received all expected messages"); - } - finally - { - publisher.close(); - for (int i = 0; i < receivers.length; i++) - { - receivers[i].close(); - } - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(MultipleConnectionTest.class); - } -} |