diff options
Diffstat (limited to 'qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java')
-rw-r--r-- | qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java b/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java new file mode 100644 index 0000000000..b0cde22349 --- /dev/null +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java @@ -0,0 +1,167 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.fragmentation; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.Session; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.log4j.Logger; + +import javax.jms.*; +import java.net.InetAddress; + +public class TestLargeSubscriber +{ + private static final Logger _logger = Logger.getLogger(TestLargeSubscriber.class); + + private static MessageProducer _destinationProducer; + + private static String _destinationName; + + public static void main(String[] args) + { + _logger.info("Starting..."); + + final String host; + final int port; + final String username; + final String password; + final String virtualPath; + final int numExpectedMessages; + if (args.length == 0) + { + host = "localhost"; + port = 5672; + username = "guest"; + password = "guest"; + virtualPath = "/test"; + numExpectedMessages = 100; + } + else if (args.length == 6) + { + host = args[0]; + port = Integer.parseInt(args[1]); + username = args[2]; + password = args[3]; + virtualPath = args[4]; + numExpectedMessages = Integer.parseInt(args[5]); + } + else + { + System.out.println("Usage: host port username password virtual-path expectedMessageCount"); + System.exit(1); + throw new RuntimeException("cannot be reached"); + } + + try + { + InetAddress address = InetAddress.getLocalHost(); + AMQConnection con = new AMQConnection(host, port, username, password, + address.getHostName(), virtualPath); + final AMQSession session = (AMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final int expectedMessageCount = numExpectedMessages; + + MessageConsumer consumer = session.createConsumer(new AMQTopic(session.getDefaultTopicExchangeName(), + new AMQShortString("large")), + 100, true, false, null); + + consumer.setMessageListener(new MessageListener() + { + private int _messageCount; + + private long _startTime = 0; + + public void onMessage(Message message) + { + validateMessage(message); + if (_messageCount++ == 0) + { + _startTime = System.currentTimeMillis(); + } + if (_logger.isInfoEnabled()) + { + _logger.info("Got message '" + message + "'"); + } + if (_messageCount == expectedMessageCount) + { + long totalTime = System.currentTimeMillis() - _startTime; + _logger.error("Total time to receive " + _messageCount + " messages was " + + totalTime + "ms. Rate is " + (_messageCount/(totalTime/1000.0))); + } + } + + private void validateMessage(Message message) + { + if (!(message instanceof BytesMessage)) + { + _logger.error("Message is not of correct type - should be BytesMessage and is " + + message.getClass()); + } + BytesMessage bm = (BytesMessage) message; + final int expectedSize = 1024 * 187; // 187k + try + { + if (bm.getBodyLength() != expectedSize) + { + _logger.error("Message is not correct length - should be " + expectedSize + " and is " + + bm.getBodyLength()); + } + } + catch (JMSException e) + { + _logger.error("Failed to validate message: " + e, e); + } + try + { + byte[] data = new byte[(int)bm.getBodyLength()]; + bm.readBytes(data); + for (int i = 0; i < data.length; i++) + { + if (data[i] != (byte)(i%25)) + { + _logger.error("byte " + i + " of message is wrong - should be " + i%25 + " but is " + + data[i]); + } + } + _logger.info("***** Validated message successfully"); + } + catch (JMSException e) + { + _logger.error("Failed to validate message: " + e, e); + } + } + }); + con.start(); + } + catch (Throwable t) + { + System.err.println("Fatal error: " + t); + t.printStackTrace(); + } + + System.out.println("Waiting..."); + } +} + |