diff options
10 files changed, 1285 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..cac0064785 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.ConnectorConfig; + +import javax.jms.ConnectionFactory; + +class AMQConnectionFactoryInitialiser implements ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) + { + return new AMQConnectionFactory(config.getHost(), config.getPort(), "/test_path"); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java new file mode 100644 index 0000000000..14db74438f --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +public abstract class AbstractConfig +{ + public boolean setOptions(String[] argv) + { + try + { + for(int i = 0; i < argv.length - 1; i += 2) + { + String key = argv[i]; + String value = argv[i+1]; + setOption(key, value); + } + return true; + } + catch(Exception e) + { + System.out.println(e.getMessage()); + } + return false; + } + + protected int parseInt(String msg, String i) + { + try + { + return Integer.parseInt(i); + } + catch(NumberFormatException e) + { + throw new RuntimeException(msg + ": " + i, e); + } + } + + protected long parseLong(String msg, String i) + { + try + { + return Long.parseLong(i); + } + catch(NumberFormatException e) + { + throw new RuntimeException(msg + ": " + i, e); + } + } + + public abstract void setOption(String key, String value); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..a9984eb09a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +public interface ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException; +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java new file mode 100644 index 0000000000..ff2377f087 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +public class Connector +{ + public Connection createConnection(ConnectorConfig config) throws Exception + { + return getConnectionFactory(config).createConnection(); + } + + ConnectionFactory getConnectionFactory(ConnectorConfig config) throws Exception + { + String factory = config.getFactory(); + if(factory == null) factory = AMQConnectionFactoryInitialiser.class.getName(); + System.out.println("Using " + factory); + return ((ConnectionFactoryInitialiser) Class.forName(factory).newInstance()).getFactory(config); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java new file mode 100644 index 0000000000..b120ed3f12 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +public interface ConnectorConfig +{ + public String getHost(); + public int getPort(); + public String getFactory(); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..a0248a8f79 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.client.JMSAMQException; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.MBeanException; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.naming.NameNotFoundException; +import java.util.Hashtable; + +public class JBossConnectionFactoryInitialiser implements ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException + { + ConnectionFactory cf = null; + InitialContext ic = null; + Hashtable ht = new Hashtable(); + ht.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); + String jbossHost = System.getProperty("jboss.host", "eqd-lxamq01"); + String jbossPort = System.getProperty("jboss.port", "1099"); + ht.put(InitialContext.PROVIDER_URL, "jnp://" + jbossHost + ":" + jbossPort); + ht.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); + + try + { + ic = new InitialContext(ht); + if (!doesDestinationExist("topictest.messages", ic)) + { + deployTopic("topictest.messages", ic); + } + if (!doesDestinationExist("topictest.control", ic)) + { + deployTopic("topictest.control", ic); + } + + cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); + return cf; + } + catch (NamingException e) + { + throw new JMSAMQException("Unable to lookup object: " + e, e); + } + catch (Exception e) + { + throw new JMSAMQException("Error creating topic: " + e, e); + } + } + + private boolean doesDestinationExist(String name, InitialContext ic) throws Exception + { + try + { + ic.lookup("/" + name); + } + catch (NameNotFoundException e) + { + return false; + } + return true; + } + + private void deployTopic(String name, InitialContext ic) throws Exception + { + MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic); + + ObjectName serverObjectName = new ObjectName("jboss.messaging:service=ServerPeer"); + + String jndiName = "/" + name; + try + { + mBeanServer.invoke(serverObjectName, "createTopic", + new Object[]{name, jndiName}, + new String[]{"java.lang.String", "java.lang.String"}); + } + catch (MBeanException e) + { + System.err.println("Error: " + e); + System.err.println("Cause: " + e.getCause()); + } + } + + private MBeanServerConnection lookupMBeanServerProxy(InitialContext ic) throws NamingException + { + return (MBeanServerConnection) ic.lookup("jmx/invoker/RMIAdaptor"); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java new file mode 100755 index 0000000000..d5c0979399 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -0,0 +1,326 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.config.Connector; +import org.apache.qpid.config.AbstractConfig; + +import javax.jms.Connection; + +public class Config extends AbstractConfig implements ConnectorConfig +{ + + private String host = "localhost"; + private int port = 5672; + private String factory = null; + + private int payload = 256; + private int messages = 1000; + private int clients = 1; + private int batch = 1; + private long delay = 1; + private int warmup; + private int ackMode= AMQSession.NO_ACKNOWLEDGE; + private String clientId; + private String subscriptionId; + private String selector; + private String destinationName; + private boolean persistent; + private boolean transacted; + private int destinationsCount; + private int batchSize; + private int rate; + private boolean ispubsub; + private long timeout; + + public Config() + { + } + + public int getAckMode() + { + return ackMode; + } + + public void setPayload(int payload) + { + this.payload = payload; + } + + public int getPayload() + { + return payload; + } + + void setClients(int clients) + { + this.clients = clients; + } + + int getClients() + { + return clients; + } + + void setMessages(int messages) + { + this.messages = messages; + } + + public int getMessages() + { + return messages; + } + + public int getBatchSize() + { + return batchSize; + } + + public int getRate() + { + return rate; + } + + public int getDestinationsCount() + { + return destinationsCount; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public String getFactory() + { + return factory; + } + + public void setPort(int port) + { + this.port = port; + } + + int getBatch() + { + return batch; + } + + void setBatch(int batch) + { + this.batch = batch; + } + + int getWarmup() + { + return warmup; + } + + void setWarmup(int warmup) + { + this.warmup = warmup; + } + + public long getDelay() + { + return delay; + } + + public void setDelay(long delay) + { + this.delay = delay; + } + + public long getTimeout() + { + return timeout; + } + + public void setTimeout(long time) + { + this.timeout = time; + } + + public String getClientId() + { + return clientId; + } + + public String getSubscriptionId() + { + return subscriptionId; + } + + public String getSelector() + { + return selector; + } + + public String getDestination() + { + return destinationName; + } + + public boolean usePersistentMessages() + { + return persistent; + } + + public boolean isTransacted() + { + return transacted; + } + + public boolean isPubSub() + { + return ispubsub; + } + + public void setOption(String key, String value) + { + if("-host".equalsIgnoreCase(key)) + { + setHost(value); + } + else if("-port".equalsIgnoreCase(key)) + { + try + { + setPort(Integer.parseInt(value)); + } + catch(NumberFormatException e) + { + throw new RuntimeException("Bad port number: " + value, e); + } + } + else if("-payload".equalsIgnoreCase(key)) + { + setPayload(parseInt("Bad payload size", value)); + } + else if("-messages".equalsIgnoreCase(key)) + { + setMessages(parseInt("Bad message count", value)); + } + else if("-clients".equalsIgnoreCase(key)) + { + setClients(parseInt("Bad client count", value)); + } + else if("-batch".equalsIgnoreCase(key)) + { + setBatch(parseInt("Bad batch count", value)); + } + else if("-delay".equalsIgnoreCase(key)) + { + setDelay(parseLong("Bad batch delay", value)); + } + else if("-warmup".equalsIgnoreCase(key)) + { + setWarmup(parseInt("Bad warmup count", value)); + } + else if("-ack".equalsIgnoreCase(key)) + { + ackMode = parseInt("Bad ack mode", value); + } + else if("-factory".equalsIgnoreCase(key)) + { + factory = value; + } + else if("-clientId".equalsIgnoreCase(key)) + { + clientId = value; + } + else if("-subscriptionId".equalsIgnoreCase(key)) + { + subscriptionId = value; + } + else if("-persistent".equalsIgnoreCase(key)) + { + persistent = "true".equalsIgnoreCase(value); + } + else if("-transacted".equalsIgnoreCase(key)) + { + transacted = "true".equalsIgnoreCase(value); + } + else if ("-destinationscount".equalsIgnoreCase(key)) + { + destinationsCount = parseInt("Bad destinations count", value); + } + else if ("-batchsize".equalsIgnoreCase(key)) + { + batchSize = parseInt("Bad batch size", value); + } + else if ("-rate".equalsIgnoreCase(key)) + { + rate = parseInt("MEssage rate", value); + } + else if("-pubsub".equalsIgnoreCase(key)) + { + ispubsub = "true".equalsIgnoreCase(value); + } + else if("-selector".equalsIgnoreCase(key)) + { + selector = value; + } + else if("-destinationname".equalsIgnoreCase(key)) + { + destinationName = value; + } + else if("-timeout".equalsIgnoreCase(key)) + { + setTimeout(parseLong("Bad timeout data", value)); + } + else + { + System.out.println("Ignoring unrecognised option: " + key); + } + } + + static String getAckModeDescription(int ackMode) + { + switch(ackMode) + { + case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE"; + case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE"; + case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE"; + case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE"; + case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE"; + } + return "AckMode=" + ackMode; + } + + public Connection createConnection() throws Exception + { + return new Connector().createConnection(this); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java new file mode 100755 index 0000000000..6dcea42bfe --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java @@ -0,0 +1,303 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import java.util.Random; + +import javax.jms.*; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for + * cross testing the java and cpp clients. + * + * <p/>How the cpp topic_publisher operates: + * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for + * the specified number of test messages to be sent. + * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST", + * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The + * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message + * about the number of messages received and how long it took, although the publisher never looks at the message content. + * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST", + * which the listener should close its connection and terminate upon receipt of. + * + * @todo I've added lots of field table types in the report message, just to check if the other end can decode them + * correctly. Not really the right place to test this, so remove them from + * {@link #createReportResponseMessage(String)} once a better test exists. + */ +public class Listener implements MessageListener +{ + private static Logger log = Logger.getLogger(Listener.class); + + public static final String CONTROL_TOPIC = "topic_control"; + public static final String RESPONSE_QUEUE = "response"; + + private final Topic _topic; + //private final Topic _control; + + private final Queue _response; + + /** Holds the connection to listen on. */ + private final Connection _connection; + + /** Holds the producer to send control messages on. */ + private final MessageProducer _controller; + + /** Holds the JMS session. */ + private final javax.jms.Session _session; + + /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ + private boolean init; + + /** Holds the count of messages received by this listener. */ + private int count; + + /** Used to hold the start time of the first message. */ + private long start; + private static String clientId; + + Listener(Connection connection, int ackMode, String name) throws Exception + { + log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name + + "): called"); + + _connection = connection; + _session = connection.createSession(false, ackMode); + + if (_session instanceof AMQSession) + { + _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, CONTROL_TOPIC); + //_control = new AMQTopic(CONTROL_TOPIC); + _response = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, RESPONSE_QUEUE); + } + else + { + _topic = _session.createTopic(CONTROL_TOPIC); + //_control = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); + } + + //register for events + if (name == null) + { + log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)"); + createTopicConsumer().setMessageListener(this); + } + else + { + log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)"); + createDurableTopicConsumer(name).setMessageListener(this); + } + + _connection.start(); + + _controller = createControlPublisher(); + System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode) + + + ((name == null) + ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")")) + + "..."); + } + + public static void main(String[] argv) throws Exception + { + clientId = "Listener-" + System.currentTimeMillis(); + + NDC.push(clientId); + + Config config = new Config(); + config.setOptions(argv); + + //Connection con = config.createConnection(); + Connection con = + new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort() + + "'"); + + if (config.getClientId() != null) + { + con.setClientID(config.getClientId()); + } + + new Listener(con, config.getAckMode(), config.getSubscriptionId()); + + NDC.pop(); + NDC.remove(); + } + + /** + * Checks whether or not a text field on a message has the specified value. + * + * @param m The message to check. + * @param fieldName The name of the field to check. + * @param value The expected value of the field to compare with. + * + * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException + { + log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + + ", String value = " + value + "): called"); + + String comp = m.getStringProperty(fieldName); + log.debug("comp = " + comp); + + boolean result = (comp != null) && comp.equals(value); + log.debug("result = " + result); + + return result; + } + + public void onMessage(Message message) + { + NDC.push(clientId); + + log.debug("public void onMessage(Message message = " + message + "): called"); + + if (!init) + { + start = System.nanoTime() / 1000000; + count = 0; + init = true; + } + + try + { + if (isShutdown(message)) + { + log.debug("Got a shutdown message."); + shutdown(); + } + else if (isReport(message)) + { + log.debug("Got a report request message."); + + // Send the report. + report(); + init = false; + } + } + catch (JMSException e) + { + log.warn("There was a JMSException during onMessage.", e); + } + finally + { + NDC.pop(); + } + } + + Message createReportResponseMessage(String msg) throws JMSException + { + Message message = _session.createTextMessage(msg); + + // Shove some more field table type in the message just to see if the other end can handle it. + message.setBooleanProperty("BOOLEAN", true); + message.setByteProperty("BYTE", (byte) 5); + message.setDoubleProperty("DOUBLE", Math.PI); + message.setFloatProperty("FLOAT", 1.0f); + message.setIntProperty("INT", 1); + message.setShortProperty("SHORT", (short) 1); + message.setLongProperty("LONG", (long) 1827361278); + message.setStringProperty("STRING", "hello"); + + return message; + } + + boolean isShutdown(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); + + //log.debug("isShutdown = " + result); + + return result; + } + + boolean isReport(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); + + //log.debug("isReport = " + result); + + return result; + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_response); + } + + private void shutdown() + { + try + { + _session.close(); + _connection.stop(); + _connection.close(); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private void report() + { + log.debug("private void report(): called"); + + try + { + String msg = getReport(); + _controller.send(createReportResponseMessage(msg)); + log.debug("Sent report: " + msg); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private String getReport() + { + long time = ((System.nanoTime() / 1000000) - start); + + return "Received " + count + " in " + time + "ms"; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java new file mode 100755 index 0000000000..4efdc1cb56 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import javax.jms.*; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + */ +class MessageFactory +{ + private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + + private final Session _session; + private final Topic _topic; + private final Topic _control; + private final byte[] _payload; + + MessageFactory(Session session) throws JMSException + { + this(session, 256); + } + + MessageFactory(Session session, int size) throws JMSException + { + _session = session; + if (session instanceof AMQSession) + { + _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topic_control"); + _control = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topictest.control"); + } + else + { + _topic = session.createTopic("topic_control"); + _control = session.createTopic("topictest.control"); + } + + _payload = new byte[size]; + + for (int i = 0; i < size; i++) + { + _payload[i] = (byte) DATA[i % DATA.length]; + } + } + + private static boolean checkText(Message m, String s) + { + try + { + return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return false; + } + } + + Topic getTopic() + { + return _topic; + } + + Message createEventMessage() throws JMSException + { + BytesMessage msg = _session.createBytesMessage(); + msg.writeBytes(_payload); + + return msg; + } + + Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return e.toString(); + } + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageConsumer createControlConsumer() throws Exception + { + return _session.createConsumer(_control); + } + + MessageProducer createTopicPublisher() throws Exception + { + return _session.createProducer(_topic); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_control); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java new file mode 100755 index 0000000000..c3b19b558a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import javax.jms.*; + +public class Publisher implements MessageListener +{ + private final Object _lock = new Object(); + private final Connection _connection; + private final Session _session; + private final MessageFactory _factory; + private final MessageProducer _publisher; + private int _count; + + Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception + { + _connection = connection; + _session = _connection.createSession(false, ackMode); + _factory = new MessageFactory(_session, size); + _publisher = _factory.createTopicPublisher(); + _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + "."); + } + + private void test(Config config) throws Exception + { + test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup()); + } + + private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception + { + _factory.createControlConsumer().setMessageListener(this); + _connection.start(); + + if (warmup > 0) + { + System.out.println("Runing warmup (" + warmup + " msgs)"); + long time = batch(warmup, consumerCount); + System.out.println("Warmup completed in " + time + "ms"); + } + + long[] times = new long[batches]; + for (int i = 0; i < batches; i++) + { + if (i > 0) + { + Thread.sleep(delay * 1000); + } + times[i] = batch(msgCount, consumerCount); + System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms."); + } + + long min = min(times); + long max = max(times); + System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); + + //request shutdown + _publisher.send(_factory.createShutdownMessage()); + + _connection.stop(); + _connection.close(); + } + + private long batch(int msgCount, int consumerCount) throws Exception + { + _count = consumerCount; + long start = System.currentTimeMillis(); + publish(msgCount); + waitForCompletion(consumerCount); + return System.currentTimeMillis() - start; + } + + private void publish(int count) throws Exception + { + + //send events + for (int i = 0; i < count; i++) + { + _publisher.send(_factory.createEventMessage()); + if ((i + 1) % 100 == 0) + { + System.out.println("Sent " + (i + 1) + " messages"); + } + } + + //request report + _publisher.send(_factory.createReportRequestMessage()); + } + + private void waitForCompletion(int consumers) throws Exception + { + System.out.println("Waiting for completion..."); + synchronized (_lock) + { + while (_count > 0) + { + _lock.wait(); + } + } + } + + + public void onMessage(Message message) + { + System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining"); + if (_count == 0) + { + synchronized (_lock) + { + _lock.notify(); + } + } + } + + static long min(long[] times) + { + long min = times.length > 0 ? times[0] : 0; + for (int i = 0; i < times.length; i++) + { + min = Math.min(min, times[i]); + } + return min; + } + + static long max(long[] times) + { + long max = times.length > 0 ? times[0] : 0; + for (int i = 0; i < times.length; i++) + { + max = Math.max(max, times[i]); + } + return max; + } + + static long avg(long[] times, long min, long max) + { + long sum = 0; + for (int i = 0; i < times.length; i++) + { + sum += times[i]; + } + + int adjustment = 0; + + // Remove min and max if we have run enough batches. + if (times.length > 2) + { + sum -= min; + sum -= max; + adjustment = 2; + } + + return (sum / (times.length - adjustment)); + } + + public static void main(String[] argv) throws Exception + { + Config config = new Config(); + config.setOptions(argv); + + Connection con = config.createConnection(); + int size = config.getPayload(); + int ackMode = config.getAckMode(); + boolean persistent = config.usePersistentMessages(); + new Publisher(con, size, ackMode, persistent).test(config); + } +} |