diff options
Diffstat (limited to 'qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java')
-rw-r--r-- | qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java | 303 |
1 files changed, 0 insertions, 303 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java deleted file mode 100644 index 6dcea42bfe..0000000000 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.topic; - -import java.util.Random; - -import javax.jms.*; - -import org.apache.log4j.Logger; -import org.apache.log4j.NDC; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.exchange.ExchangeDefaults; - -/** - * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for - * cross testing the java and cpp clients. - * - * <p/>How the cpp topic_publisher operates: - * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for - * the specified number of test messages to be sent. - * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST", - * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The - * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message - * about the number of messages received and how long it took, although the publisher never looks at the message content. - * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST", - * which the listener should close its connection and terminate upon receipt of. - * - * @todo I've added lots of field table types in the report message, just to check if the other end can decode them - * correctly. Not really the right place to test this, so remove them from - * {@link #createReportResponseMessage(String)} once a better test exists. - */ -public class Listener implements MessageListener -{ - private static Logger log = Logger.getLogger(Listener.class); - - public static final String CONTROL_TOPIC = "topic_control"; - public static final String RESPONSE_QUEUE = "response"; - - private final Topic _topic; - //private final Topic _control; - - private final Queue _response; - - /** Holds the connection to listen on. */ - private final Connection _connection; - - /** Holds the producer to send control messages on. */ - private final MessageProducer _controller; - - /** Holds the JMS session. */ - private final javax.jms.Session _session; - - /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ - private boolean init; - - /** Holds the count of messages received by this listener. */ - private int count; - - /** Used to hold the start time of the first message. */ - private long start; - private static String clientId; - - Listener(Connection connection, int ackMode, String name) throws Exception - { - log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name - + "): called"); - - _connection = connection; - _session = connection.createSession(false, ackMode); - - if (_session instanceof AMQSession) - { - _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, CONTROL_TOPIC); - //_control = new AMQTopic(CONTROL_TOPIC); - _response = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, RESPONSE_QUEUE); - } - else - { - _topic = _session.createTopic(CONTROL_TOPIC); - //_control = _session.createTopic(CONTROL_TOPIC); - _response = _session.createQueue(RESPONSE_QUEUE); - } - - //register for events - if (name == null) - { - log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)"); - createTopicConsumer().setMessageListener(this); - } - else - { - log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)"); - createDurableTopicConsumer(name).setMessageListener(this); - } - - _connection.start(); - - _controller = createControlPublisher(); - System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode) - + - ((name == null) - ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")")) - + "..."); - } - - public static void main(String[] argv) throws Exception - { - clientId = "Listener-" + System.currentTimeMillis(); - - NDC.push(clientId); - - Config config = new Config(); - config.setOptions(argv); - - //Connection con = config.createConnection(); - Connection con = - new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort() - + "'"); - - if (config.getClientId() != null) - { - con.setClientID(config.getClientId()); - } - - new Listener(con, config.getAckMode(), config.getSubscriptionId()); - - NDC.pop(); - NDC.remove(); - } - - /** - * Checks whether or not a text field on a message has the specified value. - * - * @param m The message to check. - * @param fieldName The name of the field to check. - * @param value The expected value of the field to compare with. - * - * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException - { - log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName - + ", String value = " + value + "): called"); - - String comp = m.getStringProperty(fieldName); - log.debug("comp = " + comp); - - boolean result = (comp != null) && comp.equals(value); - log.debug("result = " + result); - - return result; - } - - public void onMessage(Message message) - { - NDC.push(clientId); - - log.debug("public void onMessage(Message message = " + message + "): called"); - - if (!init) - { - start = System.nanoTime() / 1000000; - count = 0; - init = true; - } - - try - { - if (isShutdown(message)) - { - log.debug("Got a shutdown message."); - shutdown(); - } - else if (isReport(message)) - { - log.debug("Got a report request message."); - - // Send the report. - report(); - init = false; - } - } - catch (JMSException e) - { - log.warn("There was a JMSException during onMessage.", e); - } - finally - { - NDC.pop(); - } - } - - Message createReportResponseMessage(String msg) throws JMSException - { - Message message = _session.createTextMessage(msg); - - // Shove some more field table type in the message just to see if the other end can handle it. - message.setBooleanProperty("BOOLEAN", true); - message.setByteProperty("BYTE", (byte) 5); - message.setDoubleProperty("DOUBLE", Math.PI); - message.setFloatProperty("FLOAT", 1.0f); - message.setIntProperty("INT", 1); - message.setShortProperty("SHORT", (short) 1); - message.setLongProperty("LONG", (long) 1827361278); - message.setStringProperty("STRING", "hello"); - - return message; - } - - boolean isShutdown(Message m) throws JMSException - { - boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); - - //log.debug("isShutdown = " + result); - - return result; - } - - boolean isReport(Message m) throws JMSException - { - boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); - - //log.debug("isReport = " + result); - - return result; - } - - MessageConsumer createTopicConsumer() throws Exception - { - return _session.createConsumer(_topic); - } - - MessageConsumer createDurableTopicConsumer(String name) throws Exception - { - return _session.createDurableSubscriber(_topic, name); - } - - MessageProducer createControlPublisher() throws Exception - { - return _session.createProducer(_response); - } - - private void shutdown() - { - try - { - _session.close(); - _connection.stop(); - _connection.close(); - } - catch (Exception e) - { - e.printStackTrace(System.out); - } - } - - private void report() - { - log.debug("private void report(): called"); - - try - { - String msg = getReport(); - _controller.send(createReportResponseMessage(msg)); - log.debug("Sent report: " + msg); - } - catch (Exception e) - { - e.printStackTrace(System.out); - } - } - - private String getReport() - { - long time = ((System.nanoTime() / 1000000) - start); - - return "Received " + count + " in " + time + "ms"; - } -} |