summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-02-13 10:09:49 +0000
committerAidan Skinner <aidan@apache.org>2008-02-13 10:09:49 +0000
commiteff51b9febb3dc299e206ac339a84f28d5e96531 (patch)
treef578b3dd635891d3040d539c0a1932447bacf9ea
parentb11c1b17314029a12a1372dd78fa4cfa8b74c2c6 (diff)
downloadqpid-python-eff51b9febb3dc299e206ac339a84f28d5e96531.tar.gz
Import some extra perftest bits from M2.1
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@627337 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java35
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java69
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java29
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java40
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java28
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java112
-rwxr-xr-xqpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java326
-rwxr-xr-xqpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java303
-rwxr-xr-xqpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java157
-rwxr-xr-xqpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java186
10 files changed, 1285 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..cac0064785
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.ConnectorConfig;
+
+import javax.jms.ConnectionFactory;
+
+class AMQConnectionFactoryInitialiser implements ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config)
+ {
+ return new AMQConnectionFactory(config.getHost(), config.getPort(), "/test_path");
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
new file mode 100644
index 0000000000..14db74438f
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+public abstract class AbstractConfig
+{
+ public boolean setOptions(String[] argv)
+ {
+ try
+ {
+ for(int i = 0; i < argv.length - 1; i += 2)
+ {
+ String key = argv[i];
+ String value = argv[i+1];
+ setOption(key, value);
+ }
+ return true;
+ }
+ catch(Exception e)
+ {
+ System.out.println(e.getMessage());
+ }
+ return false;
+ }
+
+ protected int parseInt(String msg, String i)
+ {
+ try
+ {
+ return Integer.parseInt(i);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException(msg + ": " + i, e);
+ }
+ }
+
+ protected long parseLong(String msg, String i)
+ {
+ try
+ {
+ return Long.parseLong(i);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException(msg + ": " + i, e);
+ }
+ }
+
+ public abstract void setOption(String key, String value);
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..a9984eb09a
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+public interface ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException;
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java
new file mode 100644
index 0000000000..ff2377f087
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+public class Connector
+{
+ public Connection createConnection(ConnectorConfig config) throws Exception
+ {
+ return getConnectionFactory(config).createConnection();
+ }
+
+ ConnectionFactory getConnectionFactory(ConnectorConfig config) throws Exception
+ {
+ String factory = config.getFactory();
+ if(factory == null) factory = AMQConnectionFactoryInitialiser.class.getName();
+ System.out.println("Using " + factory);
+ return ((ConnectionFactoryInitialiser) Class.forName(factory).newInstance()).getFactory(config);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java
new file mode 100644
index 0000000000..b120ed3f12
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+public interface ConnectorConfig
+{
+ public String getHost();
+ public int getPort();
+ public String getFactory();
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..a0248a8f79
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.client.JMSAMQException;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.MBeanException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.NameNotFoundException;
+import java.util.Hashtable;
+
+public class JBossConnectionFactoryInitialiser implements ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException
+ {
+ ConnectionFactory cf = null;
+ InitialContext ic = null;
+ Hashtable ht = new Hashtable();
+ ht.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+ String jbossHost = System.getProperty("jboss.host", "eqd-lxamq01");
+ String jbossPort = System.getProperty("jboss.port", "1099");
+ ht.put(InitialContext.PROVIDER_URL, "jnp://" + jbossHost + ":" + jbossPort);
+ ht.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
+
+ try
+ {
+ ic = new InitialContext(ht);
+ if (!doesDestinationExist("topictest.messages", ic))
+ {
+ deployTopic("topictest.messages", ic);
+ }
+ if (!doesDestinationExist("topictest.control", ic))
+ {
+ deployTopic("topictest.control", ic);
+ }
+
+ cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
+ return cf;
+ }
+ catch (NamingException e)
+ {
+ throw new JMSAMQException("Unable to lookup object: " + e, e);
+ }
+ catch (Exception e)
+ {
+ throw new JMSAMQException("Error creating topic: " + e, e);
+ }
+ }
+
+ private boolean doesDestinationExist(String name, InitialContext ic) throws Exception
+ {
+ try
+ {
+ ic.lookup("/" + name);
+ }
+ catch (NameNotFoundException e)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ private void deployTopic(String name, InitialContext ic) throws Exception
+ {
+ MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic);
+
+ ObjectName serverObjectName = new ObjectName("jboss.messaging:service=ServerPeer");
+
+ String jndiName = "/" + name;
+ try
+ {
+ mBeanServer.invoke(serverObjectName, "createTopic",
+ new Object[]{name, jndiName},
+ new String[]{"java.lang.String", "java.lang.String"});
+ }
+ catch (MBeanException e)
+ {
+ System.err.println("Error: " + e);
+ System.err.println("Cause: " + e.getCause());
+ }
+ }
+
+ private MBeanServerConnection lookupMBeanServerProxy(InitialContext ic) throws NamingException
+ {
+ return (MBeanServerConnection) ic.lookup("jmx/invoker/RMIAdaptor");
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
new file mode 100755
index 0000000000..d5c0979399
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
@@ -0,0 +1,326 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.config.Connector;
+import org.apache.qpid.config.AbstractConfig;
+
+import javax.jms.Connection;
+
+public class Config extends AbstractConfig implements ConnectorConfig
+{
+
+ private String host = "localhost";
+ private int port = 5672;
+ private String factory = null;
+
+ private int payload = 256;
+ private int messages = 1000;
+ private int clients = 1;
+ private int batch = 1;
+ private long delay = 1;
+ private int warmup;
+ private int ackMode= AMQSession.NO_ACKNOWLEDGE;
+ private String clientId;
+ private String subscriptionId;
+ private String selector;
+ private String destinationName;
+ private boolean persistent;
+ private boolean transacted;
+ private int destinationsCount;
+ private int batchSize;
+ private int rate;
+ private boolean ispubsub;
+ private long timeout;
+
+ public Config()
+ {
+ }
+
+ public int getAckMode()
+ {
+ return ackMode;
+ }
+
+ public void setPayload(int payload)
+ {
+ this.payload = payload;
+ }
+
+ public int getPayload()
+ {
+ return payload;
+ }
+
+ void setClients(int clients)
+ {
+ this.clients = clients;
+ }
+
+ int getClients()
+ {
+ return clients;
+ }
+
+ void setMessages(int messages)
+ {
+ this.messages = messages;
+ }
+
+ public int getMessages()
+ {
+ return messages;
+ }
+
+ public int getBatchSize()
+ {
+ return batchSize;
+ }
+
+ public int getRate()
+ {
+ return rate;
+ }
+
+ public int getDestinationsCount()
+ {
+ return destinationsCount;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public String getFactory()
+ {
+ return factory;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ int getBatch()
+ {
+ return batch;
+ }
+
+ void setBatch(int batch)
+ {
+ this.batch = batch;
+ }
+
+ int getWarmup()
+ {
+ return warmup;
+ }
+
+ void setWarmup(int warmup)
+ {
+ this.warmup = warmup;
+ }
+
+ public long getDelay()
+ {
+ return delay;
+ }
+
+ public void setDelay(long delay)
+ {
+ this.delay = delay;
+ }
+
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ public void setTimeout(long time)
+ {
+ this.timeout = time;
+ }
+
+ public String getClientId()
+ {
+ return clientId;
+ }
+
+ public String getSubscriptionId()
+ {
+ return subscriptionId;
+ }
+
+ public String getSelector()
+ {
+ return selector;
+ }
+
+ public String getDestination()
+ {
+ return destinationName;
+ }
+
+ public boolean usePersistentMessages()
+ {
+ return persistent;
+ }
+
+ public boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ public boolean isPubSub()
+ {
+ return ispubsub;
+ }
+
+ public void setOption(String key, String value)
+ {
+ if("-host".equalsIgnoreCase(key))
+ {
+ setHost(value);
+ }
+ else if("-port".equalsIgnoreCase(key))
+ {
+ try
+ {
+ setPort(Integer.parseInt(value));
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException("Bad port number: " + value, e);
+ }
+ }
+ else if("-payload".equalsIgnoreCase(key))
+ {
+ setPayload(parseInt("Bad payload size", value));
+ }
+ else if("-messages".equalsIgnoreCase(key))
+ {
+ setMessages(parseInt("Bad message count", value));
+ }
+ else if("-clients".equalsIgnoreCase(key))
+ {
+ setClients(parseInt("Bad client count", value));
+ }
+ else if("-batch".equalsIgnoreCase(key))
+ {
+ setBatch(parseInt("Bad batch count", value));
+ }
+ else if("-delay".equalsIgnoreCase(key))
+ {
+ setDelay(parseLong("Bad batch delay", value));
+ }
+ else if("-warmup".equalsIgnoreCase(key))
+ {
+ setWarmup(parseInt("Bad warmup count", value));
+ }
+ else if("-ack".equalsIgnoreCase(key))
+ {
+ ackMode = parseInt("Bad ack mode", value);
+ }
+ else if("-factory".equalsIgnoreCase(key))
+ {
+ factory = value;
+ }
+ else if("-clientId".equalsIgnoreCase(key))
+ {
+ clientId = value;
+ }
+ else if("-subscriptionId".equalsIgnoreCase(key))
+ {
+ subscriptionId = value;
+ }
+ else if("-persistent".equalsIgnoreCase(key))
+ {
+ persistent = "true".equalsIgnoreCase(value);
+ }
+ else if("-transacted".equalsIgnoreCase(key))
+ {
+ transacted = "true".equalsIgnoreCase(value);
+ }
+ else if ("-destinationscount".equalsIgnoreCase(key))
+ {
+ destinationsCount = parseInt("Bad destinations count", value);
+ }
+ else if ("-batchsize".equalsIgnoreCase(key))
+ {
+ batchSize = parseInt("Bad batch size", value);
+ }
+ else if ("-rate".equalsIgnoreCase(key))
+ {
+ rate = parseInt("MEssage rate", value);
+ }
+ else if("-pubsub".equalsIgnoreCase(key))
+ {
+ ispubsub = "true".equalsIgnoreCase(value);
+ }
+ else if("-selector".equalsIgnoreCase(key))
+ {
+ selector = value;
+ }
+ else if("-destinationname".equalsIgnoreCase(key))
+ {
+ destinationName = value;
+ }
+ else if("-timeout".equalsIgnoreCase(key))
+ {
+ setTimeout(parseLong("Bad timeout data", value));
+ }
+ else
+ {
+ System.out.println("Ignoring unrecognised option: " + key);
+ }
+ }
+
+ static String getAckModeDescription(int ackMode)
+ {
+ switch(ackMode)
+ {
+ case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE";
+ case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE";
+ case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE";
+ case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE";
+ case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE";
+ }
+ return "AckMode=" + ackMode;
+ }
+
+ public Connection createConnection() throws Exception
+ {
+ return new Connector().createConnection(this);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
new file mode 100755
index 0000000000..6dcea42bfe
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import java.util.Random;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+/**
+ * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for
+ * cross testing the java and cpp clients.
+ *
+ * <p/>How the cpp topic_publisher operates:
+ * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for
+ * the specified number of test messages to be sent.
+ * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST",
+ * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The
+ * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message
+ * about the number of messages received and how long it took, although the publisher never looks at the message content.
+ * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST",
+ * which the listener should close its connection and terminate upon receipt of.
+ *
+ * @todo I've added lots of field table types in the report message, just to check if the other end can decode them
+ * correctly. Not really the right place to test this, so remove them from
+ * {@link #createReportResponseMessage(String)} once a better test exists.
+ */
+public class Listener implements MessageListener
+{
+ private static Logger log = Logger.getLogger(Listener.class);
+
+ public static final String CONTROL_TOPIC = "topic_control";
+ public static final String RESPONSE_QUEUE = "response";
+
+ private final Topic _topic;
+ //private final Topic _control;
+
+ private final Queue _response;
+
+ /** Holds the connection to listen on. */
+ private final Connection _connection;
+
+ /** Holds the producer to send control messages on. */
+ private final MessageProducer _controller;
+
+ /** Holds the JMS session. */
+ private final javax.jms.Session _session;
+
+ /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */
+ private boolean init;
+
+ /** Holds the count of messages received by this listener. */
+ private int count;
+
+ /** Used to hold the start time of the first message. */
+ private long start;
+ private static String clientId;
+
+ Listener(Connection connection, int ackMode, String name) throws Exception
+ {
+ log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name
+ + "): called");
+
+ _connection = connection;
+ _session = connection.createSession(false, ackMode);
+
+ if (_session instanceof AMQSession)
+ {
+ _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, CONTROL_TOPIC);
+ //_control = new AMQTopic(CONTROL_TOPIC);
+ _response = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, RESPONSE_QUEUE);
+ }
+ else
+ {
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ //_control = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
+ }
+
+ //register for events
+ if (name == null)
+ {
+ log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)");
+ createTopicConsumer().setMessageListener(this);
+ }
+ else
+ {
+ log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)");
+ createDurableTopicConsumer(name).setMessageListener(this);
+ }
+
+ _connection.start();
+
+ _controller = createControlPublisher();
+ System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode)
+ +
+ ((name == null)
+ ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")"))
+ + "...");
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ clientId = "Listener-" + System.currentTimeMillis();
+
+ NDC.push(clientId);
+
+ Config config = new Config();
+ config.setOptions(argv);
+
+ //Connection con = config.createConnection();
+ Connection con =
+ new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort()
+ + "'");
+
+ if (config.getClientId() != null)
+ {
+ con.setClientID(config.getClientId());
+ }
+
+ new Listener(con, config.getAckMode(), config.getSubscriptionId());
+
+ NDC.pop();
+ NDC.remove();
+ }
+
+ /**
+ * Checks whether or not a text field on a message has the specified value.
+ *
+ * @param m The message to check.
+ * @param fieldName The name of the field to check.
+ * @param value The expected value of the field to compare with.
+ *
+ * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException
+ {
+ log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ + ", String value = " + value + "): called");
+
+ String comp = m.getStringProperty(fieldName);
+ log.debug("comp = " + comp);
+
+ boolean result = (comp != null) && comp.equals(value);
+ log.debug("result = " + result);
+
+ return result;
+ }
+
+ public void onMessage(Message message)
+ {
+ NDC.push(clientId);
+
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ if (!init)
+ {
+ start = System.nanoTime() / 1000000;
+ count = 0;
+ init = true;
+ }
+
+ try
+ {
+ if (isShutdown(message))
+ {
+ log.debug("Got a shutdown message.");
+ shutdown();
+ }
+ else if (isReport(message))
+ {
+ log.debug("Got a report request message.");
+
+ // Send the report.
+ report();
+ init = false;
+ }
+ }
+ catch (JMSException e)
+ {
+ log.warn("There was a JMSException during onMessage.", e);
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+
+ Message createReportResponseMessage(String msg) throws JMSException
+ {
+ Message message = _session.createTextMessage(msg);
+
+ // Shove some more field table type in the message just to see if the other end can handle it.
+ message.setBooleanProperty("BOOLEAN", true);
+ message.setByteProperty("BYTE", (byte) 5);
+ message.setDoubleProperty("DOUBLE", Math.PI);
+ message.setFloatProperty("FLOAT", 1.0f);
+ message.setIntProperty("INT", 1);
+ message.setShortProperty("SHORT", (short) 1);
+ message.setLongProperty("LONG", (long) 1827361278);
+ message.setStringProperty("STRING", "hello");
+
+ return message;
+ }
+
+ boolean isShutdown(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ //log.debug("isShutdown = " + result);
+
+ return result;
+ }
+
+ boolean isReport(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST");
+
+ //log.debug("isReport = " + result);
+
+ return result;
+ }
+
+ MessageConsumer createTopicConsumer() throws Exception
+ {
+ return _session.createConsumer(_topic);
+ }
+
+ MessageConsumer createDurableTopicConsumer(String name) throws Exception
+ {
+ return _session.createDurableSubscriber(_topic, name);
+ }
+
+ MessageProducer createControlPublisher() throws Exception
+ {
+ return _session.createProducer(_response);
+ }
+
+ private void shutdown()
+ {
+ try
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private void report()
+ {
+ log.debug("private void report(): called");
+
+ try
+ {
+ String msg = getReport();
+ _controller.send(createReportResponseMessage(msg));
+ log.debug("Sent report: " + msg);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private String getReport()
+ {
+ long time = ((System.nanoTime() / 1000000) - start);
+
+ return "Received " + count + " in " + time + "ms";
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
new file mode 100755
index 0000000000..4efdc1cb56
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import javax.jms.*;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+/**
+ */
+class MessageFactory
+{
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+ private final Session _session;
+ private final Topic _topic;
+ private final Topic _control;
+ private final byte[] _payload;
+
+ MessageFactory(Session session) throws JMSException
+ {
+ this(session, 256);
+ }
+
+ MessageFactory(Session session, int size) throws JMSException
+ {
+ _session = session;
+ if (session instanceof AMQSession)
+ {
+ _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topic_control");
+ _control = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topictest.control");
+ }
+ else
+ {
+ _topic = session.createTopic("topic_control");
+ _control = session.createTopic("topictest.control");
+ }
+
+ _payload = new byte[size];
+
+ for (int i = 0; i < size; i++)
+ {
+ _payload[i] = (byte) DATA[i % DATA.length];
+ }
+ }
+
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return false;
+ }
+ }
+
+ Topic getTopic()
+ {
+ return _topic;
+ }
+
+ Message createEventMessage() throws JMSException
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ msg.writeBytes(_payload);
+
+ return msg;
+ }
+
+ Message createShutdownMessage() throws JMSException
+ {
+ return _session.createTextMessage("SHUTDOWN");
+ }
+
+ Message createReportRequestMessage() throws JMSException
+ {
+ return _session.createTextMessage("REPORT");
+ }
+
+ Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return _session.createTextMessage(msg);
+ }
+
+ boolean isShutdown(Message m)
+ {
+ return checkText(m, "SHUTDOWN");
+ }
+
+ boolean isReport(Message m)
+ {
+ return checkText(m, "REPORT");
+ }
+
+ Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return e.toString();
+ }
+ }
+
+ MessageConsumer createTopicConsumer() throws Exception
+ {
+ return _session.createConsumer(_topic);
+ }
+
+ MessageConsumer createDurableTopicConsumer(String name) throws Exception
+ {
+ return _session.createDurableSubscriber(_topic, name);
+ }
+
+ MessageConsumer createControlConsumer() throws Exception
+ {
+ return _session.createConsumer(_control);
+ }
+
+ MessageProducer createTopicPublisher() throws Exception
+ {
+ return _session.createProducer(_topic);
+ }
+
+ MessageProducer createControlPublisher() throws Exception
+ {
+ return _session.createProducer(_control);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java
new file mode 100755
index 0000000000..c3b19b558a
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import javax.jms.*;
+
+public class Publisher implements MessageListener
+{
+ private final Object _lock = new Object();
+ private final Connection _connection;
+ private final Session _session;
+ private final MessageFactory _factory;
+ private final MessageProducer _publisher;
+ private int _count;
+
+ Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception
+ {
+ _connection = connection;
+ _session = _connection.createSession(false, ackMode);
+ _factory = new MessageFactory(_session, size);
+ _publisher = _factory.createTopicPublisher();
+ _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + ".");
+ }
+
+ private void test(Config config) throws Exception
+ {
+ test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup());
+ }
+
+ private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception
+ {
+ _factory.createControlConsumer().setMessageListener(this);
+ _connection.start();
+
+ if (warmup > 0)
+ {
+ System.out.println("Runing warmup (" + warmup + " msgs)");
+ long time = batch(warmup, consumerCount);
+ System.out.println("Warmup completed in " + time + "ms");
+ }
+
+ long[] times = new long[batches];
+ for (int i = 0; i < batches; i++)
+ {
+ if (i > 0)
+ {
+ Thread.sleep(delay * 1000);
+ }
+ times[i] = batch(msgCount, consumerCount);
+ System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms.");
+ }
+
+ long min = min(times);
+ long max = max(times);
+ System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
+
+ //request shutdown
+ _publisher.send(_factory.createShutdownMessage());
+
+ _connection.stop();
+ _connection.close();
+ }
+
+ private long batch(int msgCount, int consumerCount) throws Exception
+ {
+ _count = consumerCount;
+ long start = System.currentTimeMillis();
+ publish(msgCount);
+ waitForCompletion(consumerCount);
+ return System.currentTimeMillis() - start;
+ }
+
+ private void publish(int count) throws Exception
+ {
+
+ //send events
+ for (int i = 0; i < count; i++)
+ {
+ _publisher.send(_factory.createEventMessage());
+ if ((i + 1) % 100 == 0)
+ {
+ System.out.println("Sent " + (i + 1) + " messages");
+ }
+ }
+
+ //request report
+ _publisher.send(_factory.createReportRequestMessage());
+ }
+
+ private void waitForCompletion(int consumers) throws Exception
+ {
+ System.out.println("Waiting for completion...");
+ synchronized (_lock)
+ {
+ while (_count > 0)
+ {
+ _lock.wait();
+ }
+ }
+ }
+
+
+ public void onMessage(Message message)
+ {
+ System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
+ if (_count == 0)
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+ }
+
+ static long min(long[] times)
+ {
+ long min = times.length > 0 ? times[0] : 0;
+ for (int i = 0; i < times.length; i++)
+ {
+ min = Math.min(min, times[i]);
+ }
+ return min;
+ }
+
+ static long max(long[] times)
+ {
+ long max = times.length > 0 ? times[0] : 0;
+ for (int i = 0; i < times.length; i++)
+ {
+ max = Math.max(max, times[i]);
+ }
+ return max;
+ }
+
+ static long avg(long[] times, long min, long max)
+ {
+ long sum = 0;
+ for (int i = 0; i < times.length; i++)
+ {
+ sum += times[i];
+ }
+
+ int adjustment = 0;
+
+ // Remove min and max if we have run enough batches.
+ if (times.length > 2)
+ {
+ sum -= min;
+ sum -= max;
+ adjustment = 2;
+ }
+
+ return (sum / (times.length - adjustment));
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config();
+ config.setOptions(argv);
+
+ Connection con = config.createConnection();
+ int size = config.getPayload();
+ int ackMode = config.getAckMode();
+ boolean persistent = config.usePersistentMessages();
+ new Publisher(con, size, ackMode, persistent).test(config);
+ }
+}