diff options
Diffstat (limited to 'qpid/java/perftests/src/main/java/org/apache/qpid')
27 files changed, 6800 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java new file mode 100644 index 0000000000..3ad6c021bd --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.SimpleByteBufferAllocator; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.BytesMessage; +import javax.jms.TextMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; + +public class TestMessageFactory +{ + private static final String MESSAGE_DATA_BYTES = "-message payload-message paylaod-message payload-message paylaod"; + + public static TextMessage newTextMessage(Session session, int size) throws JMSException + { + return session.createTextMessage(createMessagePayload(size)); + } + + public static TextMessage newJMSTextMessage(Session session, int size, String encoding) throws JMSException + { + + TextMessage message = session.createTextMessage(); + message.clearBody(); + message.setText(createMessagePayload(size)); + return message; + } + + public static BytesMessage newBytesMessage(Session session, int size) throws JMSException + { + BytesMessage message = session.createBytesMessage(); + message.writeUTF(createMessagePayload(size)); + return message; + } + + public static StreamMessage newStreamMessage(Session session, int size) throws JMSException + { + StreamMessage message = session.createStreamMessage(); + message.writeString(createMessagePayload(size)); + return message; + } + + public static ObjectMessage newObjectMessage(Session session, int size) throws JMSException + { + if (size == 0) + { + return session.createObjectMessage(); + } + else + { + return session.createObjectMessage(createMessagePayload(size)); + } + } + + /** + * Creates an ObjectMessage with given size and sets the JMS properties (JMSReplyTo and DeliveryMode) + * @param session + * @param replyDestination + * @param size + * @param persistent + * @return the new ObjectMessage + * @throws JMSException + */ + public static ObjectMessage newObjectMessage(Session session, Destination replyDestination, int size, boolean persistent) throws JMSException + { + ObjectMessage msg = newObjectMessage(session, size); + + // Set the messages persistent delivery flag. + msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // Ensure that the temporary reply queue is set as the reply to destination for the message. + if (replyDestination != null) + { + msg.setJMSReplyTo(replyDestination); + } + + return msg; + } + + public static String createMessagePayload(int size) + { + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count <= (size - MESSAGE_DATA_BYTES.length())) + { + buf.append(MESSAGE_DATA_BYTES); + count += MESSAGE_DATA_BYTES.length(); + } + if (count < size) + { + buf.append(MESSAGE_DATA_BYTES, 0, size - count); + } + + return buf.toString(); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..cac0064785 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.ConnectorConfig; + +import javax.jms.ConnectionFactory; + +class AMQConnectionFactoryInitialiser implements ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) + { + return new AMQConnectionFactory(config.getHost(), config.getPort(), "/test_path"); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java new file mode 100644 index 0000000000..14db74438f --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +public abstract class AbstractConfig +{ + public boolean setOptions(String[] argv) + { + try + { + for(int i = 0; i < argv.length - 1; i += 2) + { + String key = argv[i]; + String value = argv[i+1]; + setOption(key, value); + } + return true; + } + catch(Exception e) + { + System.out.println(e.getMessage()); + } + return false; + } + + protected int parseInt(String msg, String i) + { + try + { + return Integer.parseInt(i); + } + catch(NumberFormatException e) + { + throw new RuntimeException(msg + ": " + i, e); + } + } + + protected long parseLong(String msg, String i) + { + try + { + return Long.parseLong(i); + } + catch(NumberFormatException e) + { + throw new RuntimeException(msg + ": " + i, e); + } + } + + public abstract void setOption(String key, String value); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..a9984eb09a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +public interface ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException; +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java new file mode 100644 index 0000000000..ff2377f087 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/Connector.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +public class Connector +{ + public Connection createConnection(ConnectorConfig config) throws Exception + { + return getConnectionFactory(config).createConnection(); + } + + ConnectionFactory getConnectionFactory(ConnectorConfig config) throws Exception + { + String factory = config.getFactory(); + if(factory == null) factory = AMQConnectionFactoryInitialiser.class.getName(); + System.out.println("Using " + factory); + return ((ConnectionFactoryInitialiser) Class.forName(factory).newInstance()).getFactory(config); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java new file mode 100644 index 0000000000..b120ed3f12 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +public interface ConnectorConfig +{ + public String getHost(); + public int getPort(); + public String getFactory(); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..a0248a8f79 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.client.JMSAMQException; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.MBeanException; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.naming.NameNotFoundException; +import java.util.Hashtable; + +public class JBossConnectionFactoryInitialiser implements ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException + { + ConnectionFactory cf = null; + InitialContext ic = null; + Hashtable ht = new Hashtable(); + ht.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); + String jbossHost = System.getProperty("jboss.host", "eqd-lxamq01"); + String jbossPort = System.getProperty("jboss.port", "1099"); + ht.put(InitialContext.PROVIDER_URL, "jnp://" + jbossHost + ":" + jbossPort); + ht.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); + + try + { + ic = new InitialContext(ht); + if (!doesDestinationExist("topictest.messages", ic)) + { + deployTopic("topictest.messages", ic); + } + if (!doesDestinationExist("topictest.control", ic)) + { + deployTopic("topictest.control", ic); + } + + cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); + return cf; + } + catch (NamingException e) + { + throw new JMSAMQException("Unable to lookup object: " + e, e); + } + catch (Exception e) + { + throw new JMSAMQException("Error creating topic: " + e, e); + } + } + + private boolean doesDestinationExist(String name, InitialContext ic) throws Exception + { + try + { + ic.lookup("/" + name); + } + catch (NameNotFoundException e) + { + return false; + } + return true; + } + + private void deployTopic(String name, InitialContext ic) throws Exception + { + MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic); + + ObjectName serverObjectName = new ObjectName("jboss.messaging:service=ServerPeer"); + + String jndiName = "/" + name; + try + { + mBeanServer.invoke(serverObjectName, "createTopic", + new Object[]{name, jndiName}, + new String[]{"java.lang.String", "java.lang.String"}); + } + catch (MBeanException e) + { + System.err.println("Error: " + e); + System.err.println("Cause: " + e.getCause()); + } + } + + private MBeanServerConnection lookupMBeanServerProxy(InitialContext ic) throws NamingException + { + return (MBeanServerConnection) ic.lookup("jmx/invoker/RMIAdaptor"); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Config.java new file mode 100644 index 0000000000..5b6169ed2d --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Config.java @@ -0,0 +1,243 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.oldtopic; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.Connector; +import org.apache.qpid.config.AbstractConfig; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +class Config extends AbstractConfig implements ConnectorConfig +{ + + private String host = "localhost"; + private int port = 5672; + private String factory = null; + + private int payload = 256; + private int messages = 1000; + private int clients = 1; + private int batch = 1; + private long delay = 1; + private int warmup; + private int ackMode= AMQSession.NO_ACKNOWLEDGE; + private String clientId; + private String subscriptionId; + private boolean persistent; + + public Config() + { + } + + int getAckMode() + { + return ackMode; + } + + void setPayload(int payload) + { + this.payload = payload; + } + + int getPayload() + { + return payload; + } + + void setClients(int clients) + { + this.clients = clients; + } + + int getClients() + { + return clients; + } + + void setMessages(int messages) + { + this.messages = messages; + } + + int getMessages() + { + return messages; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public String getFactory() + { + return factory; + } + + public void setPort(int port) + { + this.port = port; + } + + int getBatch() + { + return batch; + } + + void setBatch(int batch) + { + this.batch = batch; + } + + int getWarmup() + { + return warmup; + } + + void setWarmup(int warmup) + { + this.warmup = warmup; + } + + public long getDelay() + { + return delay; + } + + public void setDelay(long delay) + { + this.delay = delay; + } + + String getClientId() + { + return clientId; + } + + String getSubscriptionId() + { + return subscriptionId; + } + + boolean usePersistentMessages() + { + return persistent; + } + + public void setOption(String key, String value) + { + if("-host".equalsIgnoreCase(key)) + { + setHost(value); + } + else if("-port".equalsIgnoreCase(key)) + { + try + { + setPort(Integer.parseInt(value)); + } + catch(NumberFormatException e) + { + throw new RuntimeException("Bad port number: " + value); + } + } + else if("-payload".equalsIgnoreCase(key)) + { + setPayload(parseInt("Bad payload size", value)); + } + else if("-messages".equalsIgnoreCase(key)) + { + setMessages(parseInt("Bad message count", value)); + } + else if("-clients".equalsIgnoreCase(key)) + { + setClients(parseInt("Bad client count", value)); + } + else if("-batch".equalsIgnoreCase(key)) + { + setBatch(parseInt("Bad batch count", value)); + } + else if("-delay".equalsIgnoreCase(key)) + { + setDelay(parseLong("Bad batch delay", value)); + } + else if("-warmup".equalsIgnoreCase(key)) + { + setWarmup(parseInt("Bad warmup count", value)); + } + else if("-ack".equalsIgnoreCase(key)) + { + ackMode = parseInt("Bad ack mode", value); + } + else if("-factory".equalsIgnoreCase(key)) + { + factory = value; + } + else if("-clientId".equalsIgnoreCase(key)) + { + clientId = value; + } + else if("-subscriptionId".equalsIgnoreCase(key)) + { + subscriptionId = value; + } + else if("-persistent".equalsIgnoreCase(key)) + { + persistent = "true".equalsIgnoreCase(value); + } + else + { + System.out.println("Ignoring unrecognised option: " + key); + } + } + + static String getAckModeDescription(int ackMode) + { + switch(ackMode) + { + case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE"; + case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE"; + case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE"; + case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE"; + case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE"; + } + return "AckMode=" + ackMode; + } + + public Connection createConnection() throws Exception + { + return new Connector().createConnection(this); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Listener.java b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Listener.java new file mode 100644 index 0000000000..4732782d4c --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Listener.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.oldtopic; +import org.apache.log4j.*; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +public class Listener implements MessageListener +{ + private final Connection _connection; + private final MessageProducer _controller; + private final javax.jms.Session _session; + private final MessageFactory _factory; + private boolean init; + private int count; + private long start; + + Listener(Connection connection, int ackMode) throws Exception + { + this(connection, ackMode, null); + } + + Listener(Connection connection, int ackMode, String name) throws Exception + { + _connection = connection; + _session = connection.createSession(false, ackMode); + _factory = new MessageFactory(_session); + + //register for events + if(name == null) + { + _factory.createTopicConsumer().setMessageListener(this); + } + else + { + _factory.createDurableTopicConsumer(name).setMessageListener(this); + } + + _connection.start(); + + _controller = _factory.createControlPublisher(); + System.out.println("Waiting for messages " + + Config.getAckModeDescription(ackMode) + + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")") + + "..."); + + } + + private void shutdown() + { + try + { + _session.close(); + _connection.stop(); + _connection.close(); + } + catch(Exception e) + { + e.printStackTrace(System.out); + } + } + + private void report() + { + try + { + String msg = getReport(); + _controller.send(_factory.createReportResponseMessage(msg)); + System.out.println("Sent report: " + msg); + } + catch(Exception e) + { + e.printStackTrace(System.out); + } + } + + private String getReport() + { + long time = (System.currentTimeMillis() - start); + return "Received " + count + " in " + time + "ms"; + } + + public void onMessage(Message message) + { + if(!init) + { + start = System.currentTimeMillis(); + count = 0; + init = true; + } + + if(_factory.isShutdown(message)) + { + shutdown(); + } + else if(_factory.isReport(message)) + { + //send a report: + report(); + init = false; + } + else if (++count % 100 == 0) + { + System.out.println("Received " + count + " messages."); + } + } + + public static void main(String[] argv) throws Exception + { + Config config = new Config(); + config.setOptions(argv); + + Connection con = config.createConnection(); + if(config.getClientId() != null) + { + con.setClientID(config.getClientId()); + } + new Listener(con, config.getAckMode(), config.getSubscriptionId()); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/MessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/MessageFactory.java new file mode 100644 index 0000000000..b2fbeb7e35 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/MessageFactory.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.oldtopic; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; + +import javax.jms.*; + +/** + */ +class MessageFactory +{ + private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + + private final Session _session; + private final Topic _topic; + private final Topic _control; + private final byte[] _payload; + + + MessageFactory(Session session) throws JMSException + { + this(session, 256); + } + + MessageFactory(Session session, int size) throws JMSException + { + _session = session; +/* if(session instanceof AMQSession) + { + _topic = new AMQTopic("topictest.messages"); + _control = new AMQTopic("topictest.control"); + } + else*/ + { + _topic = session.createTopic("topictest.messages"); + _control = session.createTopic("topictest.control"); + } + _payload = new byte[size]; + + for(int i = 0; i < size; i++) + { + _payload[i] = (byte) DATA[i % DATA.length]; + } + } + + Topic getTopic() + { + return _topic; + } + + Message createEventMessage() throws JMSException + { + BytesMessage msg = _session.createBytesMessage(); + msg.writeBytes(_payload); + return msg; + } + + Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return e.toString(); + } + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageConsumer createControlConsumer() throws Exception + { + return _session.createConsumer(_control); + } + + MessageProducer createTopicPublisher() throws Exception + { + return _session.createProducer(_topic); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_control); + } + + private static boolean checkText(Message m, String s) + { + try + { + return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return false; + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Publisher.java b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Publisher.java new file mode 100644 index 0000000000..f811704323 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/oldtopic/Publisher.java @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.oldtopic; + +import javax.jms.*; + +public class Publisher implements MessageListener +{ + private final Object _lock = new Object(); + private final Connection _connection; + private final Session _session; + private final MessageFactory _factory; + private final MessageProducer _publisher; + private int _count; + + Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception + { + _connection = connection; + _session = _connection.createSession(false, ackMode); + _factory = new MessageFactory(_session, size); + _publisher = _factory.createTopicPublisher(); + _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + "."); + } + + private void test(Config config) throws Exception + { + test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup()); + } + + private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception + { + _factory.createControlConsumer().setMessageListener(this); + _connection.start(); + + if(warmup > 0) + { + System.out.println("Runing warmup (" + warmup + " msgs)"); + long time = batch(warmup, consumerCount); + System.out.println("Warmup completed in " + time + "ms"); + } + + long[] times = new long[batches]; + for(int i = 0; i < batches; i++) + { + if(i > 0) Thread.sleep(delay*1000); + times[i] = batch(msgCount, consumerCount); + System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms."); + } + + long min = min(times); + long max = max(times); + System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); + + //request shutdown + _publisher.send(_factory.createShutdownMessage()); + + _connection.stop(); + _connection.close(); + } + + private long batch(int msgCount, int consumerCount) throws Exception + { + _count = consumerCount; + long start = System.currentTimeMillis(); + publish(msgCount); + waitForCompletion(consumerCount); + return System.currentTimeMillis() - start; + } + + private void publish(int count) throws Exception + { + + //send events + for (int i = 0; i < count; i++) + { + _publisher.send(_factory.createEventMessage()); + if ((i + 1) % 100 == 0) + { + System.out.println("Sent " + (i + 1) + " messages"); + } + } + + //request report + _publisher.send(_factory.createReportRequestMessage()); + } + + private void waitForCompletion(int consumers) throws Exception + { + System.out.println("Waiting for completion..."); + synchronized (_lock) + { + while (_count > 0) + { + _lock.wait(); + } + } + } + + + public void onMessage(Message message) + { + System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining"); + if (_count == 0) + { + synchronized (_lock) + { + _lock.notify(); + } + } + } + + static long min(long[] times) + { + long min = times.length > 0 ? times[0] : 0; + for(int i = 0; i < times.length; i++) + { + min = Math.min(min, times[i]); + } + return min; + } + + static long max(long[] times) + { + long max = times.length > 0 ? times[0] : 0; + for(int i = 0; i < times.length; i++) + { + max = Math.max(max, times[i]); + } + return max; + } + + static long avg(long[] times, long min, long max) + { + long sum = 0; + for(int i = 0; i < times.length; i++) + { + sum += times[i]; + } + sum -= min; + sum -= max; + + return (sum / (times.length - 2)); + } + + public static void main(String[] argv) throws Exception + { + Config config = new Config(); + config.setOptions(argv); + + Connection con = config.createConnection(); + int size = config.getPayload(); + int ackMode = config.getAckMode(); + boolean persistent = config.usePersistentMessages(); + new Publisher(con, size, ackMode, persistent).test(config); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java new file mode 100644 index 0000000000..dc78276edd --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -0,0 +1,322 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><td> Responsibilities <th> Collaborations + * <tr><td> Send many ping messages and output timings asynchronously on batches received. + * </table> + */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingAsyncTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); + } + + /** + * Compile all the tests into a test suite. + * @return The test suite to run. Should only contain testAsyncPingOk method. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + * @throws Exception pass all errors out to the test harness + */ + public void testAsyncPingOk(int numPings) throws Exception + { + // _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // get prefill count to update the expected count + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + + // Ensure that at least one ping was requeusted. + if (numPings + preFill == 0) + { + _logger.error("Number of pings requested was zero."); + fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); + // String messageCorrelationId = perThreadSetup._correlationId; + // _logger.debug("messageCorrelationId = " + messageCorrelationId); + + + // Initialize the count and timing controller for the new correlation id. + // This perCorrelationId is only used for controlling the test. + // The PingClient itself uses its own perCorrelationId see in PingPongProducer + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill); + perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); + + // Must be called before pingAndWaitForReply to setup the CorrelationID. + // This is required because pingClient.start() will start all client threads + // This means that the CorrelationID must be registered before hand. + pingClient.setupCorrelationID(perThreadSetup._correlationId, perCorrelationId._expectedCount); + + // Start the client connection if: + // 1) we are not in a SEND_ONLY test. + // 2) if we have not yet started client because messages are sitting on broker. + // This is either due to a preFill or a consume only test. + if (!testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME) && + (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))) + { + pingClient.start(); + } + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < perCorrelationId._expectedCount) + { + perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); + } + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(perThreadSetup._correlationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** The latency recoreded for the batch */ + private long _batchLatency = 0; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException + { + // Record the latency for the whole batch + _batchLatency += latency; + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + + "): called on batch boundary for message id: " + correlationId + " with thread id: " + + Thread.currentThread().getId());*/ + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + // Record the total latency for the batch. + // if batchSize=1 then this will just be the message latency + tc.completeTest(true, receivedInBatch, null, _batchSize == 1 ? latency : _batchLatency); + // Reset latency + _batchLatency = 0; + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java new file mode 100644 index 0000000000..dcfc67d4fc --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import javax.jms.Destination; + +import java.util.List; +import java.util.Properties; + +/** + * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer} + * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues. + * It is an all in one ping client, that produces and consumes its own pings. + * + * <p/>The constructor increments a count of the number of ping clients created. It is assumed that where many + * are created they will all be run in parallel and be active in sending and consuming pings at the same time. + * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear + * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of + * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these + * conditions. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create a ping producer that listens to its own pings <td> {@link PingPongProducer} + * <tr><td> Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings. + * </table> + */ +public class PingClient extends PingPongProducer +{ + /** Used for debugging. */ + private final Logger log = Logger.getLogger(PingClient.class); + + /** Used to count the number of ping clients created. */ + private static int _pingClientCount; + + /** + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates + * producer and consumer sessions on it, to send and recieve its pings and replies on. + * + * @param overrides Properties containing any desired overrides to the defaults. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingClient(Properties overrides) throws Exception + { + super(overrides); + + _pingClientCount++; + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List<Destination> getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging. + * + * @return The scaling up of the number of expected pub/sub pings. + */ + public int getConsumersPerDestination() + { + log.debug("public int getConsumersPerDestination(): called"); + + if (_isUnique) + { + log.debug(_noOfConsumers + " consumer per destination."); + + return _noOfConsumers; + } + else + { + log.debug((_pingClientCount * _noOfConsumers) + " consumers per destination."); + + return _pingClientCount * _noOfConsumers; + } + } + + public int getClientCount() + { + return _pingClientCount; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java new file mode 100644 index 0000000000..a15897c82b --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -0,0 +1,452 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; +import org.apache.qpid.util.CommandLineParser; + +import org.apache.qpid.junit.extensions.util.MathUtils; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and + * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop + * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the + * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with + * failure conditions when using durable messaging. + * + * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to + * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases + * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings + * with. + * + * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console. + * + * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and + * additionally accepts the following parameters: + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> numMessages <td> 100 <td> The total number of messages to send. + * <tr><td> numMessagesToAction <td> -1 <td> The number of messages to send before taking a custom 'action'. + * <tr><td> duration <td> 30S <td> The length of time to ping for. (Format dDhHmMsS, for d days, h hours, + * m minutes and s seconds). + * </table> + * + * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up + * when no parameters are specified. + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped. + * <tr><td> transacted <td> true <td> Only makes sense to test with transactions. + * <tr><td> persistent <td> true <td> Only makes sense to test persistent. + * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages. + * <tr><td> commitBatchSize <td> 10 + * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds. + * </table> + * + * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits + * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will + * wait for the second signal before receiving its pings. + * + * <p/>This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages + * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method, + * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide + * custom behaviour with alternative implementations of this method (for example taking a backup). + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Send and receive pings. + * <tr><td> Accept user input to signal stop sending. + * <tr><td> Accept user input to signal start receiving. + * <tr><td> Provide feedback on pings sent versus pings received. + * <tr><td> Provide extension point for arbitrary action on a particular message count. + * </table> + */ +public class PingDurableClient extends PingPongProducer implements ExceptionListener +{ + private static final Logger log = Logger.getLogger(PingDurableClient.class); + + public static final String NUM_MESSAGES_PROPNAME = "numMessages"; + public static final String NUM_MESSAGES_DEFAULT = "100"; + public static final String DURATION_PROPNAME = "duration"; + public static final String DURATION_DEFAULT = "30S"; + public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction"; + public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1"; + + /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */ + private static final long TIME_OUT = 3000; + + static + { + defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT); + defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT); + defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false"); + defaults.setProperty(TRANSACTED_PROPNAME, "true"); + defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); + defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); + defaults.setProperty(RATE_PROPNAME, "20"); + defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); + defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); + } + + /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ + private int numMessages; + + /** Holds the number of messages to send before taking triggering the action. */ + private int numMessagesToAction; + + /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */ + private long duration; + + /** Used to indciate that this application should terminate. Set by the shutdown hook. */ + private boolean terminate = false; + + /** + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingDurableClient(Properties overrides) throws Exception + { + super(overrides); + log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called"); + + // Extract the additional configuration parameters. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME); + String durationSpec = properties.getProperty(DURATION_PROPNAME); + numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME); + + if (durationSpec != null) + { + duration = MathUtils.parseDuration(durationSpec) * 1000000; + } + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + PingDurableClient pingProducer = new PingDurableClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.closeConnection(); + pingProducer.waitForUser("Press return to begin receiving the pings."); + pingProducer.receive(sent); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Performs the main test procedure implemented by this ping client. See the class level comment for details. + */ + protected int send() throws Exception + { + log.debug("public void sendWaitReceive(): called"); + + log.debug("duration = " + duration); + log.debug("numMessages = " + numMessages); + + if (duration > 0) + { + System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds."); + } + + if (_rate > 0) + { + System.out.println("Sending at " + _rate + " messages per second."); + } + + if (numMessages > 0) + { + System.out.println("Sending up to " + numMessages + " messages."); + } + + // Establish the connection and the message producer. + establishConnection(true, false); + _connection.start(); + + Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + + // Send pings until a terminating condition is received. + boolean endCondition = false; + int messagesSent = 0; + int messagesCommitted = 0; + int messagesNotCommitted = 0; + long start = System.nanoTime(); + + // Clear console in. + clearConsole(); + + while (!endCondition) + { + boolean committed = false; + + try + { + committed = sendMessage(messagesSent, message) && _transacted; + + messagesSent++; + messagesNotCommitted++; + + // Keep count of the number of messsages currently committed and pending commit. + if (committed) + { + log.debug("Adding " + messagesNotCommitted + " messages to the committed count."); + messagesCommitted += messagesNotCommitted; + messagesNotCommitted = 0; + + System.out.println("Commited: " + messagesCommitted); + } + } + catch (JMSException e) + { + log.debug("Got JMSException whilst sending."); + _publish = false; + } + + // Perform the arbitrary action if the number of messages sent has reached the right number. + if (messagesSent == numMessagesToAction) + { + System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = " + + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted); + takeAction(); + } + + // Determine if the end condition has been met, based on the number of messages, time passed, errors on + // the connection or user input. + long now = System.nanoTime(); + + if ((duration != 0) && ((now - start) > duration)) + { + System.out.println("Send halted because duration expired."); + endCondition = true; + } + else if ((numMessages != 0) && (messagesSent >= numMessages)) + { + System.out.println("Send halted because # messages completed."); + endCondition = true; + } + else if (System.in.available() > 0) + { + System.out.println("Send halted by user input."); + endCondition = true; + + clearConsole(); + } + else if (!_publish) + { + System.out.println("Send halted by error on the connection."); + endCondition = true; + } + } + + log.debug("messagesSent = " + messagesSent); + log.debug("messagesCommitted = " + messagesCommitted); + log.debug("messagesNotCommitted = " + messagesNotCommitted); + + System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted + + ", Messages not Committed = " + messagesNotCommitted); + + return messagesSent; + } + + protected void closeConnection() + { + // Clean up the connection. + try + { + close(); + } + catch (JMSException e) + { + log.debug("There was an error whilst closing the connection: " + e, e); + System.out.println("There was an error whilst closing the connection."); + + // Ignore as did best could manage to clean up. + } + } + + protected void receive(int messagesSent) throws Exception + { + // Re-establish the connection and the message consumer. + _queueJVMSequenceID = new AtomicInteger(); + _queueSharedID = new AtomicInteger(); + + establishConnection(false, true); + _consumer[0].setMessageListener(null); + _consumerConnection[0].start(); + + // Try to receive all of the pings that were successfully sent. + int messagesReceived = 0; + boolean endCondition = false; + + while (!endCondition) + { + // Message received = _consumer.receiveNoWait(); + Message received = _consumer[0].receive(TIME_OUT); + log.debug("received = " + received); + + if (received != null) + { + messagesReceived++; + } + + // Determine if the end condition has been met, based on the number of messages and time passed since last + // receiving a message. + if (received == null) + { + System.out.println("Timed out."); + endCondition = true; + } + else if (messagesReceived >= messagesSent) + { + System.out.println("Got all messages."); + endCondition = true; + } + } + + // Ensure messages received are committed. + if (_consTransacted) + { + try + { + _consumerSession[0].commit(); + System.out.println("Committed for all messages received."); + } + catch (JMSException e) + { + log.debug("Error during commit: " + e, e); + System.out.println("Error during commit."); + try + { + _consumerSession[0].rollback(); + System.out.println("Rolled back on all messages received."); + } + catch (JMSException e2) + { + log.debug("Error during rollback: " + e, e); + System.out.println("Error on roll back of all messages received."); + } + + } + } + + log.debug("messagesReceived = " + messagesReceived); + + System.out.println("Messages received: " + messagesReceived); + + // Clean up the connection. + close(); + } + + /** + * Clears any pending input from the console. + */ + private void clearConsole() + { + try + { + BufferedReader bis = new BufferedReader(new InputStreamReader(System.in)); + + // System.in.skip(System.in.available()); + while (bis.ready()) + { + bis.readLine(); + } + } + catch (IOException e) + { } + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List<Destination> getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with + * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the + * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving + * message should stop, not that the application should termiante. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + terminate = true; + } + }); + } + + /** + * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default + * implementation does nothing. + */ + public void takeAction() + { } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java new file mode 100644 index 0000000000..5ba4004c56 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -0,0 +1,311 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing + * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for + * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from + * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than + * waiting until all expected replies are received. + * + * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test + * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The + * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the + * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. + * + * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the + * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput + * (messages / time) can be calculated in order to examine the relationship between throughput and latency. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping + * messages and output timings for sampled individual pings. </table> + */ +public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** + * Holds test specifics by correlation id. This consists of the expected number of messages and the timing + * controler. + */ + private Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingLatencyTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** Compile all the tests into a test suite. */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Latency Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingLatencyTestPerf("testPingLatency")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all + * replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testPingLatency(int numPings) throws Exception + { + _logger.debug("public void testPingLatency(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + Message msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can + * be attached to the pinger, in order to receive notifications about every message received and the number + * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener + * outputs a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + private boolean _strictAMQP; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + _strictAMQP = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, + AMQSession.STRICT_AMQP_DEFAULT)); + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + tc.completeTest(true, receivedInBatch, latency); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of the + * total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java new file mode 100644 index 0000000000..2fe852af77 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.ping; + +import java.util.Properties; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.message.TestMessageFactory; +import org.apache.qpid.util.CommandLineParser; + +/** + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +public class PingSendOnlyClient extends PingDurableClient +{ + private static final Logger log = Logger.getLogger(PingSendOnlyClient.class); + + public PingSendOnlyClient(Properties overrides) throws Exception + { + super(overrides); + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + PingSendOnlyClient pingProducer = new PingSendOnlyClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + pingProducer.send(); + pingProducer.waitForUser("Press return to close connection and quit."); + pingProducer.closeConnection(); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException + { + Message msg = TestMessageFactory.newTextMessage(_producerSession, messageSize); + + // Timestamp the message in nanoseconds. + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + + return msg; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java new file mode 100644 index 0000000000..cf16abc596 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.AsymptoticTestCase; +import org.apache.qpid.junit.extensions.TestThreadAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times + * simultaneously to simluate many clients/producers/connections. + * + * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single + * full round trip ping. This test may be scaled up using a suitable JUnit test runner. + * + * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, + * except if the connection is lost in which case an attempt to re-establish the setup is made. + * + * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the + * temporary queue. + * + * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware +{ + private static Logger _logger = Logger.getLogger(PingTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>(); + + /** Holds a property reader to extract the test parameters from. */ + protected ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingTestPerf(String name) + { + super(name); + + _logger.debug("testParameters = " + testParameters); + } + + /** + * Compile all the tests into a test suite. + * @return The test method testPingOk. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingTestPerf("testPingOk")); + + return suite; + } + + public void testPingOk(int numPings) throws Exception + { + if (numPings == 0) + { + Assert.fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + if (perThreadSetup == null) + { + Assert.fail("Could not get per thread test setup, it was null."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) + { + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + + numReplies); + } + } + + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently. + synchronized (this) + { + // Establish a client to ping a Destination and listen the reply back from same Destination + perThreadSetup._pingClient = new PingClient(testParameters); + perThreadSetup._pingClient.establishConnection(true, true); + } + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Called after all threads have completed their setup. + */ + public void postThreadSetUp() + { + _logger.debug("public void postThreadSetUp(): called"); + + PerThreadSetup perThreadSetup = threadSetup.get(); + // Prefill the broker unless we are in consume only mode. + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0) + { + try + { + // Manually set the correlation ID to 1. This is not ideal but it is the + // value that the main test loop will use. + perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, String.valueOf(perThreadSetup._pingClient.getClientCount())); + + // Note with a large preFill and non-tx session the messages will be + // rapidly pushed in to the mina buffers. OOM's are a real risk here. + // Should perhaps consider using a TX session for the prefill. + + long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME); + + // Only delay if we are + // not doing send only + // and we have consumers + // and a delayBeforeConsume + if (!(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME)) + && (testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0) + && delayBeforeConsume > 0) + { + + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + // Only do logging if in verbose mode. + if (verbose) + { + if (delayBeforeConsume > 60000) + { + long minutes = delayBeforeConsume / 60000; + long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000; + long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000); + _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test."); + } + else + { + _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test."); + } + } + + Thread.sleep(delayBeforeConsume); + + if (verbose) + { + _logger.info("Starting Test."); + } + } + + // We can't start the client's here as the test client has not yet been configured to receieve messages. + // only when the test method is executed will the correlationID map be set up and ready to consume + // the messages we have sent here. + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + else //Only start the consumer if we are not preFilling. + { + // Start the consumers, unless we have data on the broker + // already this is signified by being in consume_only, we will + // start the clients after setting up the correlation IDs. + // We should also not start the clients if we are in Send only + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && + !(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME))) + { + // Start the client connection + try + { + perThreadSetup._pingClient.start(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) + { + perThreadSetup._pingClient.close(); + } + } + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + finally + { + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping client. + */ + protected PingClient _pingClient; + protected String _correlationId; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java new file mode 100644 index 0000000000..8e010ccf07 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -0,0 +1,453 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import java.io.IOException; +import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.Session; +import org.apache.qpid.topic.Config; +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return + * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes + * too. + * + * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages + * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique + * temporary queue or the correlation id to correlate the original message to the reply. + * + * <p/>There is a verbose mode flag which causes information about each ping to be output to the console + * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should + * be disabled for real timing tests as writing to the console will slow things down. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Bounce back messages to their reply to destination. + * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url. + * </table> + * + * @todo Replace the command line parsing with a neater tool. + * + * @todo Make verbose accept a number of messages, only prints to console every X messages. + */ +public class PingPongBouncer implements MessageListener +{ + private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); + + /** The default prefetch size for the message consumer. */ + private static final int PREFETCH = 1; + + /** The default no local flag for the message consumer. */ + private static final boolean NO_LOCAL = true; + + private static final String DEFAULT_DESTINATION_NAME = "ping"; + + /** The default exclusive flag for the message consumer. */ + private static final boolean EXCLUSIVE = false; + + /** A convenient formatter to use when time stamping output. */ + protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + + /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ + private boolean _verbose = false; + + /** Determines whether this bounce back client bounces back messages persistently. */ + private boolean _persistent = false; + + private Destination _consumerDestination; + + /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ + private Destination _lastResponseDest; + + /** The producer for sending replies with. */ + private MessageProducer _replyProducer; + + /** The consumer controlSession. */ + private Session _consumerSession; + + /** The producer controlSession. */ + private Session _producerSession; + + /** Holds the connection to the broker. */ + private AMQConnection _connection; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + private boolean _isPubSub = false; + + /** + * This flag is used to indicate that the user should be prompted to kill a broker, in order to test + * failover, immediately before committing a transaction. + */ + protected boolean _failBeforeCommit = false; + + /** + * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test + * failover, immediate after committing a transaction. + */ + protected boolean _failAfterCommit = false; + + /** + * Creates a PingPongBouncer on the specified producer and consumer sessions. + * + * @param brokerDetails The addresses of the brokers to connect to. + * @param username The broker username. + * @param password The broker password. + * @param virtualpath The virtual host name within the broker. + * @param destinationName The name of the queue to receive pings on + * (or root of the queue name where many queues are generated). + * @param persistent A flag to indicate that persistent message should be used. + * @param transacted A flag to indicate that pings should be sent within transactions. + * @param selector A message selector to filter received pings with. + * @param verbose A flag to indicate that message timings should be sent to the console. + * + * @throws Exception All underlying exceptions allowed to fall through. This is only test code... + */ + public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, + String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, + boolean pubsub) throws Exception + { + // Create a client id to uniquely identify this client. + InetAddress address = InetAddress.getLocalHost(); + String clientId = address.getHostName() + System.currentTimeMillis(); + _verbose = verbose; + _persistent = persistent; + setPubSub(pubsub); + // Connect to the broker. + setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); + _logger.info("Connected with URL:" + getConnection().toURL()); + + // Set up the failover notifier. + getConnection().setConnectionListener(new FailoverNotifier()); + + // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the + // command line option. + _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + // Create the queue to listen for message on. + createConsumerDestination(destinationName); + MessageConsumer consumer = + _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); + + // Create a producer for the replies, without a default destination. + _replyProducer = _producerSession.createProducer(null); + _replyProducer.setDisableMessageTimestamp(true); + _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // Set this up to listen for messages on the queue. + consumer.setMessageListener(this); + } + + /** + * Starts a stand alone ping-pong client running in verbose mode. + * + * @param args + */ + public static void main(String[] args) throws Exception + { + System.out.println("Starting..."); + + // Display help on the command line. + if (args.length == 0) + { + _logger.info("Running test with default values..."); + //usage(); + //System.exit(0); + } + + // Extract all command line parameters. + Config config = new Config(); + config.setOptions(args); + String brokerDetails = config.getHost() + ":" + config.getPort(); + String virtualpath = "test"; + String destinationName = config.getDestination(); + if (destinationName == null) + { + destinationName = DEFAULT_DESTINATION_NAME; + } + + String selector = config.getSelector(); + boolean transacted = config.isTransacted(); + boolean persistent = config.usePersistentMessages(); + boolean pubsub = config.isPubSub(); + boolean verbose = true; + + //String selector = null; + + // Instantiate the ping pong client with the command line options and start it running. + PingPongBouncer pingBouncer = + new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, + selector, verbose, pubsub); + pingBouncer.getConnection().start(); + + System.out.println("Waiting..."); + } + + private static void usage() + { + System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + + "-persistent : (true/false). Default is false\n" + + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); + } + + /** + * This is a callback method that is notified of all messages for which this has been registered as a message + * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to + * destination of the message. + * + * @param message The message that triggered this callback. + */ + public void onMessage(Message message) + { + try + { + String messageCorrelationId = message.getJMSCorrelationID(); + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " + + messageCorrelationId); + } + + // Get the reply to destination from the message and check it is set. + Destination responseDest = message.getJMSReplyTo(); + + if (responseDest == null) + { + _logger.debug("Cannot send reply because reply-to destination is null."); + + return; + } + + // Spew out some timing information if verbose mode is on. + if (_verbose) + { + Long timestamp = message.getLongProperty("timestamp"); + + if (timestamp != null) + { + long diff = System.currentTimeMillis() - timestamp; + _logger.info("Time to bounce point: " + diff); + } + } + + // Correlate the reply to the original. + message.setJMSCorrelationID(messageCorrelationId); + + // Send the receieved message as the pong reply. + _replyProducer.send(responseDest, message); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " + + messageCorrelationId); + } + + // Commit the transaction if running in transactional mode. + commitTx(_producerSession); + } + catch (JMSException e) + { + _logger.debug("There was a JMSException: " + e.getMessage(), e); + } + } + + /** + * Gets the underlying connection that this ping client is running on. + * + * @return The underlying connection that this ping client is running on. + */ + public AMQConnection getConnection() + { + return _connection; + } + + /** + * Sets the connection that this ping client is using. + * + * @param connection The ping connection. + */ + public void setConnection(AMQConnection connection) + { + this._connection = connection; + } + + /** + * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. + * + * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue. + */ + public void setPubSub(boolean pubsub) + { + _isPubSub = pubsub; + } + + /** + * Checks whether this client is a p2p or pub/sub ping client. + * + * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue. + */ + public boolean isPubSub() + { + return _isPubSub; + } + + /** + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not + * a transactional controlSession, this method does nothing. + * + * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the + * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker + * after the commit is applied. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + */ + protected void commitTx(Session session) throws JMSException + { + if (session.getTransacted()) + { + try + { + if (_failBeforeCommit) + { + _logger.debug("Failing Before Commit"); + doFailover(); + } + + session.commit(); + + if (_failAfterCommit) + { + _logger.debug("Failing After Commit"); + doFailover(); + } + + _logger.debug("Session Commited."); + } + catch (JMSException e) + { + _logger.trace("JMSException on commit:" + e.getMessage(), e); + + try + { + session.rollback(); + _logger.debug("Message rolled back."); + } + catch (JMSException jmse) + { + _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } + } + } + + /** + * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + * + * @param broker The name of the broker to terminate. + */ + protected void doFailover(String broker) + { + System.out.println("Kill Broker " + broker + " now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + } + + /** + * Prompts the user to terminate the broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + */ + protected void doFailover() + { + System.out.println("Kill Broker now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + + } + + private void createConsumerDestination(String name) + { + if (isPubSub()) + { + _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name); + } + else + { + _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name); + } + } + + /** + * A connection listener that logs out any failover complete events. Could do more interesting things with this + * at some point... + */ + public static class FailoverNotifier implements ConnectionListener + { + public void bytesSent(long count) + { } + + public void bytesReceived(long count) + { } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback."); + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java new file mode 100644 index 0000000000..0bf952b7e1 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -0,0 +1,1833 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.test.framework.TestUtils; + +import org.apache.qpid.junit.extensions.BatchedThrottle; +import org.apache.qpid.junit.extensions.Throttle; +import org.apache.qpid.junit.extensions.util.CommandLineParser; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import java.io.*; +import java.net.InetAddress; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may + * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens + * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping + * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour + * configurable. + * + * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This + * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation + * id in the ping to be bounced back in the reply correlation id. + * + * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It + * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within + * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover + * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers. + * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping. + * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used. + * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions. + * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to. + * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over. + * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit. + * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message. + * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default. + * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch. + * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch. + * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send. + * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send. + * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once. + * <tr><td> username <td> guest <td> The username to access the broker with. + * <tr><td> password <td> guest <td> The password to access the broker with. + * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with. + * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to. + * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination. + * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies. + * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode. + * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all. + * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used. + * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are: + * 0 - SESSION_TRANSACTED + * 1 - AUTO_ACKNOWLEDGE + * 2 - CLIENT_ACKNOWLEDGE + * 3 - DUPS_OK_ACKNOWLEDGE + * 257 - NO_ACKNOWLEDGE + * 258 - PRE_ACKNOWLEDGE + * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value + * as the 'transacted' option if not seperately defined. + * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same + * value as 'ackMode' if not seperately defined. + * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received. + * Limits the volume of messages currently buffered on the client + * or broker. Can help scale test clients by limiting amount of buffered + * data to avoid out of memory errors. + * </table> + * + * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop + * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by + * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also + * registered to terminate the ping-pong loop cleanly. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide a ping and wait for all responses cycle. + * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url. + * </table> + * + * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. + * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a + * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last + * message waits until all other messages have been handled before releasing producers but allows messages to be + * processed concurrently, unlike the current synchronized block. + */ +public class PingPongProducer implements Runnable, ExceptionListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(PingPongProducer.class); + + /** Holds the name of the property to determine whether of not client id is overridden at connection time. */ + public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId"; + + /** Holds the default value of the override client id flag. */ + public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false"; + + /** Holds the name of the property to define the JNDI factory name with. */ + public static final String FACTORY_NAME_PROPNAME = "factoryName"; + + /** Holds the default JNDI name of the connection factory. */ + public static final String FACTORY_NAME_DEAFULT = "local"; + + /** Holds the name of the property to set the JNDI initial context properties with. */ + public static final String FILE_PROPERTIES_PROPNAME = "properties"; + + /** Holds the default file name of the JNDI initial context properties. */ + public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties"; + + /** Holds the name of the property to get the test message size from. */ + public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; + + /** Used to set up a default message size. */ + public static final int MESSAGE_SIZE_DEAFULT = 0; + + /** Holds the name of the property to get the ping queue name from. */ + public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; + + /** Holds the name of the default destination to send pings on. */ + public static final String PING_QUEUE_NAME_DEFAULT = "ping"; + + /** Holds the name of the property to get the queue name postfix from. */ + public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix"; + + /** Holds the default queue name postfix value. */ + public static final String QUEUE_NAME_POSTFIX_DEFAULT = ""; + + /** Holds the name of the property to get the test delivery mode from. */ + public static final String PERSISTENT_MODE_PROPNAME = "persistent"; + + /** Holds the message delivery mode to use for the test. */ + public static final boolean PERSISTENT_MODE_DEFAULT = false; + + /** Holds the name of the property to get the test transactional mode from. */ + public static final String TRANSACTED_PROPNAME = "transacted"; + + /** Holds the transactional mode to use for the test. */ + public static final boolean TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to get the test consumer transacted mode from. */ + public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; + + /** Holds the consumer transactional mode default setting. */ + public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "broker"; + + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "tcp://localhost:5672"; + + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /** Holds the default virtual path for the test. */ + public static final String VIRTUAL_HOST_DEFAULT = ""; + + /** Holds the name of the property to get the message rate from. */ + public static final String RATE_PROPNAME = "rate"; + + /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + public static final int RATE_DEFAULT = 0; + + /** Holds the name of the property to get the verbose mode proeprty from. */ + public static final String VERBOSE_PROPNAME = "verbose"; + + /** Holds the default verbose mode. */ + public static final boolean VERBOSE_DEFAULT = false; + + /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ + public static final String PUBSUB_PROPNAME = "pubsub"; + + /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + public static final boolean PUBSUB_DEFAULT = false; + + /** Holds the name of the property to get the fail after commit flag from. */ + public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; + + /** Holds the default failover after commit test flag. */ + public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; + + /** Holds the name of the proeprty to get the fail before commit flag from. */ + public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; + + /** Holds the default failover before commit test flag. */ + public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; + + /** Holds the name of the proeprty to get the fail after send flag from. */ + public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; + + /** Holds the default failover after send test flag. */ + public static final boolean FAIL_AFTER_SEND_DEFAULT = false; + + /** Holds the name of the property to get the fail before send flag from. */ + public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; + + /** Holds the default failover before send test flag. */ + public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; + + /** Holds the name of the property to get the fail once flag from. */ + public static final String FAIL_ONCE_PROPNAME = "failOnce"; + + /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ + public static final boolean FAIL_ONCE_DEFAULT = true; + + /** Holds the name of the property to get the broker access username from. */ + public static final String USERNAME_PROPNAME = "username"; + + /** Holds the default broker log on username. */ + public static final String USERNAME_DEFAULT = "guest"; + + /** Holds the name of the property to get the broker access password from. */ + public static final String PASSWORD_PROPNAME = "password"; + + /** Holds the default broker log on password. */ + public static final String PASSWORD_DEFAULT = "guest"; + + /** Holds the name of the proeprty to get the. */ + public static final String SELECTOR_PROPNAME = "selector"; + + /** Holds the default message selector. */ + public static final String SELECTOR_DEFAULT = ""; + + /** Holds the name of the property to get the destination count from. */ + public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; + + /** Defines the default number of destinations to ping. */ + public static final int DESTINATION_COUNT_DEFAULT = 1; + + /** Holds the name of the property to get the number of consumers per destination from. */ + public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; + + /** Defines the default number consumers per destination. */ + public static final int NUM_CONSUMERS_DEFAULT = 1; + + /** Holds the name of the property to get the waiting timeout for response messages. */ + public static final String TIMEOUT_PROPNAME = "timeout"; + + /** Default time to wait before assuming that a ping has timed out. */ + public static final long TIMEOUT_DEFAULT = 30000; + + /** Holds the name of the property to get the commit batch size from. */ + public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; + + /** Defines the default number of pings to send in each transaction when running transactionally. */ + public static final int TX_BATCH_SIZE_DEFAULT = 1; + + /** Holds the name of the property to get the unique destinations flag from. */ + public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; + + /** Defines the default value for the unique destinations property. */ + public static final boolean UNIQUE_DESTS_DEFAULT = true; + + /** Holds the name of the property to get the durable destinations flag from. */ + public static final String DURABLE_DESTS_PROPNAME = "durableDests"; + + /** Defines the default value of the durable destinations flag. */ + public static final boolean DURABLE_DESTS_DEFAULT = false; + + /** Holds the name of the proeprty to get the message acknowledgement mode from. */ + public static final String ACK_MODE_PROPNAME = "ackMode"; + + /** Defines the default message acknowledgement mode. */ + public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + /** Holds the name of the property to get the consumers message acknowledgement mode from. */ + public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; + + /** Defines the default consumers message acknowledgement mode. */ + public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + /** Holds the name of the property to get the maximum pending message size setting from. */ + public static final String MAX_PENDING_PROPNAME = "maxPending"; + + /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ + public static final int MAX_PENDING_DEFAULT = 0; + + /** Defines the default prefetch size to use when consuming messages. */ + public static final int PREFETCH_DEFAULT = 100; + + /** Defines the default value of the no local flag to use when consuming messages. */ + public static final boolean NO_LOCAL_DEFAULT = false; + + /** Defines the default value of the exclusive flag to use when consuming messages. */ + public static final boolean EXCLUSIVE_DEFAULT = false; + + /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ + public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; + + /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */ + public static final String PREFILL_PROPNAME = "preFill"; + + /** Defines the default value for the number of messages to prefill. 0,default, no messages. */ + public static final int PREFILL_DEFAULT = 0; + + /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */ + public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume"; + + /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */ + public static final long DELAY_BEFORE_CONSUME = 0; + + /** Holds the name of the property to get when no messasges should be sent. */ + public static final String CONSUME_ONLY_PROPNAME = "consumeOnly"; + + /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ + public static final boolean CONSUME_ONLY_DEFAULT = false; + + /** Holds the name of the property to get when no messasges should be sent. */ + public static final String SEND_ONLY_PROPNAME = "sendOnly"; + + /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ + public static final boolean SEND_ONLY_DEFAULT = false; + + /** Holds the default configuration properties. */ + public static ParsedProperties defaults = new ParsedProperties(); + + static + { + defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT); + defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); + defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); + defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT); + defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); + defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); + defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); + defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); + defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); + defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); + defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); + defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); + defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); + defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT); + defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME); + defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT); + defaults.setPropertyIfNull(SEND_ONLY_PROPNAME, SEND_ONLY_DEFAULT); + } + + /** Allows setting of client ID on the connection, rather than through the connection URL. */ + protected boolean _overrideClientId; + + /** Holds the JNDI name of the JMS connection factory. */ + protected String _factoryName; + + /** Holds the name of the properties file to configure JNDI with. */ + protected String _fileProperties; + + /** Holds the broker url. */ + protected String _brokerDetails; + + /** Holds the username to access the broker with. */ + protected String _username; + + /** Holds the password to access the broker with. */ + protected String _password; + + /** Holds the virtual host on the broker to run the tests through. */ + protected String _virtualpath; + + /** Holds the root name from which to generate test destination names. */ + protected String _destinationName; + + /** Holds the default queue name postfix value. */ + protected String _queueNamePostfix; + + /** Holds the message selector to filter the pings with. */ + protected String _selector; + + /** Holds the producers transactional mode flag. */ + protected boolean _transacted; + + /** Holds the consumers transactional mode flag. */ + protected boolean _consTransacted; + + /** Determines whether this producer sends persistent messages. */ + protected boolean _persistent; + + /** Holds the acknowledgement mode used for the producers. */ + protected int _ackMode; + + /** Holds the acknowledgement mode setting for the consumers. */ + protected int _consAckMode; + + /** Determines what size of messages this producer sends. */ + protected int _messageSize; + + /** Used to indicate that the ping loop should print out whenever it pings. */ + protected boolean _verbose; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + protected boolean _isPubSub; + + /** Flag used to indicate if the destinations should be unique client. */ + protected boolean _isUnique; + + /** Flag used to indicate that durable destination should be used. */ + protected boolean _isDurable; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ + protected boolean _failBeforeCommit; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ + protected boolean _failAfterCommit; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ + protected boolean _failBeforeSend; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ + protected boolean _failAfterSend; + + /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ + protected boolean _failOnce; + + /** Holds the number of sends that should be performed in every transaction when using transactions. */ + protected int _txBatchSize; + + /** Holds the number of destinations to ping. */ + protected int _noOfDestinations; + + /** Holds the number of consumers per destination. */ + protected int _noOfConsumers; + + /** Holds the maximum send rate in herz. */ + protected int _rate; + + /** + * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended + * if this limit is breached. + */ + protected int _maxPendingSize; + + /** + * Holds the number of messages to send during the setup phase, before the clients start consuming. + */ + private Integer _preFill; + + /** + * Holds the time in ms to wait after preFilling before starting thet test. + */ + private Long _delayBeforeConsume; + + /** + * Holds a boolean value of wither this test should just consume, i.e. skips + * sending messages, but still expects to receive the specified number. + */ + private boolean _consumeOnly; + + /** + * Holds a boolean value of wither this test should just send, i.e. skips + * consuming messages, but still creates clients just doesn't start them. + */ + private boolean _sendOnly; + + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ + private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); + + /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ + private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); + + /** Holds this instances unique id. */ + private int instanceId; + + /** + * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple + * ping producers on the same JVM. + */ + private static Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** A convenient formatter to use when time stamping output. */ + protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + + /** Holds the connection for the message producer. */ + protected Connection _connection; + + /** Holds the consumer connections. */ + protected Connection[] _consumerConnection; + + /** Holds the controlSession on which ping replies are received. */ + protected Session[] _consumerSession; + + /** Holds the producer controlSession, needed to create ping messages. */ + protected Session _producerSession; + + /** Holds the destination where the response messages will arrive. */ + protected Destination _replyDestination; + + /** Holds the set of destinations that this ping producer pings. */ + protected List<Destination> _pingDestinations; + + /** Used to restrict the sending rate to a specified limit. */ + protected Throttle _rateLimiter; + + /** Holds a message listener that this message listener chains all its messages to. */ + protected ChainedMessageListener _chainedMessageListener = null; + + /** + * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when + * creating multiple ping producers in the same JVM. + */ + protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); + + /** + * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers + * on the same JVM using this id generator will allow them to ping on the same queues. + */ + protected AtomicInteger _queueSharedID = new AtomicInteger(); + + /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + protected boolean _publish = true; + + /** Holds the message producer to send the pings through. */ + protected MessageProducer _producer; + + /** Holds the message consumer to receive the ping replies through. */ + protected MessageConsumer[] _consumer; + + /** The prompt to display when asking the user to kill the broker for failover testing. */ + private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; + + /** Holds the name for this test client to be identified to the broker with. */ + private String _clientID; + + /** Keeps count of the total messages sent purely for debugging purposes. */ + private static AtomicInteger numSent = new AtomicInteger(); + + /** + * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected + * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a + * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an + * equal chance to produce messages. + */ + static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); + + /** Keeps a count of the number of message currently sent but not received. */ + static AtomicInteger _unreceived = new AtomicInteger(0); + + /** + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on + * it, to send and recieve its pings and replies on. + * + * @param overrides Properties containing any desired overrides to the defaults. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingPongProducer(Properties overrides) throws Exception + { + // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + instanceId = _instanceIdGenerator.getAndIncrement(); + + // Create a set of parsed properties from the defaults overriden by the passed in values. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + // Extract the configuration properties to set the pinger up with. + _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME); + _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); + _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); + _brokerDetails = properties.getProperty(BROKER_PROPNAME); + _username = properties.getProperty(USERNAME_PROPNAME); + _password = properties.getProperty(PASSWORD_PROPNAME); + _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); + _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); + _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME); + _selector = properties.getProperty(SELECTOR_PROPNAME); + _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); + _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); + _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); + _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); + _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); + _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); + _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); + _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); + _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); + _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); + _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); + _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); + _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); + _rate = properties.getPropertyAsInteger(RATE_PROPNAME); + _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); + _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); + _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); + _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); + _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); + _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); + _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME); + _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME); + _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME); + _sendOnly = properties.getPropertyAsBoolean(SEND_ONLY_PROPNAME); + + // Check that one or more destinations were specified. + if (_noOfDestinations < 1) + { + throw new IllegalArgumentException("There must be at least one destination."); + } + + // Set up a throttle to control the send rate, if a rate > 0 is specified. + if (_rate > 0) + { + _rateLimiter = new BatchedThrottle(); + _rateLimiter.setRate(_rate); + } + + // Create the connection and message producers/consumers. + // establishConnection(true, true); + } + + /** + * Establishes a connection to the broker and creates message consumers and producers based on the parameters + * that this ping client was created with. + * + * @param producer Flag to indicate whether or not the producer should be set up. + * @param consumer Flag to indicate whether or not the consumers should be set up. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public void establishConnection(boolean producer, boolean consumer) throws Exception + { + // log.debug("public void establishConnection(): called"); + + // Generate a unique identifying name for this client, based on it ip address and the current time. + InetAddress address = InetAddress.getLocalHost(); + // _clientID = address.getHostName() + System.currentTimeMillis(); + _clientID = "perftest_" + instanceId; + + // Create a connection to the broker. + createConnection(_clientID); + + // Create transactional or non-transactional sessions, based on the command line arguments. + _producerSession = _connection.createSession(_transacted, _ackMode); + + _consumerSession = new Session[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); + } + + // Create the destinations to send pings to and receive replies from. + if (_noOfConsumers > 0) + { + _replyDestination = _consumerSession[0].createTemporaryQueue(); + } + createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); + + // Create the message producer only if instructed to. + if (producer) + { + createProducer(); + } + + // Create the message consumer only if instructed to. + if (consumer) + { + createReplyConsumers(getReplyDestinations(), _selector); + } + } + + /** + * Establishes a connection to the broker, based on the configuration parameters that this ping client was + * created with. + * + * @param clientID The clients identifier. + * + * @throws JMSException Underlying exceptions allowed to fall through. + * @throws NamingException Underlying exceptions allowed to fall through. + * @throws IOException Underlying exceptions allowed to fall through. + */ + protected void createConnection(String clientID) throws JMSException, NamingException, IOException + { + // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); + + // _log.debug("Creating a connection for the message producer."); + File propsFile = new File(_fileProperties); + InputStream is = new FileInputStream(propsFile); + Properties properties = new Properties(); + properties.load(is); + + Context context = new InitialContext(properties); + ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); + _connection = factory.createConnection(_username, _password); + + if (_overrideClientId) + { + _connection.setClientID(clientID); + } + + // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); + + _consumerConnection = new Connection[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i] = factory.createConnection(_username, _password); + // _consumerConnection[i].setClientID(clientID); + } + } + + /** + * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs + * to be started to bounce the pings back again. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + + // Create a ping producer overriding its defaults with all options passed on the command line. + PingPongProducer pingProducer = new PingPongProducer(options); + pingProducer.establishConnection(true, true); + + // Start the ping producers dispatch thread running. + pingProducer._connection.start(); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + pingProducer._connection.setExceptionListener(pingProducer); + + // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. + Thread pingThread = new Thread(pingProducer); + pingThread.run(); + pingThread.join(); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Convenience method for a short pause. + * + * @param sleepTime The time in milliseconds to pause for. + */ + public static void pause(long sleepTime) + { + if (sleepTime > 0) + { + try + { + Thread.sleep(sleepTime); + } + catch (InterruptedException ie) + { } + } + } + + /** + * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to + * destination of this pinger. + * + * @return The single reply to destination of this pinger, wrapped in a list. + */ + public List<Destination> getReplyDestinations() + { + // log.debug("public List<Destination> getReplyDestinations(): called"); + + List<Destination> replyDestinations = new ArrayList<Destination>(); + replyDestinations.add(_replyDestination); + + // log.debug("replyDestinations = " + replyDestinations); + + return replyDestinations; + } + + /** + * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery + * flag is set accoring the ping producer creation options. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createProducer() throws JMSException + { + // log.debug("public void createProducer(): called"); + + _producer = (MessageProducer) _producerSession.createProducer(null); + _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + } + + /** + * Creates consumers for the specified number of destinations. The destinations themselves are also created by this + * method. + * + * @param noOfDestinations The number of destinations to create consumers for. + * @param selector The message selector to filter the consumers with. + * @param rootName The root of the name, or actual name if only one is being created. + * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the + * numbering with all pingers on the same JVM. + * @param durable If the destinations are durable topics. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, + boolean durable) throws JMSException + { + /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " + + durable + "): called");*/ + + _pingDestinations = new ArrayList<Destination>(); + + // Create the desired number of ping destinations and consumers for them. + // log.debug("Creating " + noOfDestinations + " destinations to ping."); + + for (int i = 0; i < noOfDestinations; i++) + { + Destination destination; + String id; + + // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. + if (unique) + { + // log.debug("Creating unique destinations."); + id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); + } + else + { + // log.debug("Creating shared destinations."); + id = "_" + _queueSharedID.incrementAndGet(); + } + + // Check if this is a pub/sub pinger, in which case create topics. + if (_isPubSub) + { + destination = _producerSession.createTopic(rootName + id); + // log.debug("Created non-durable topic " + destination); + + if (durable) + { + _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID()); + } + } + // Otherwise this is a p2p pinger, in which case create queues. + else + { + destination = _producerSession.createQueue(rootName + id + _queueNamePostfix); + // log.debug("Created queue " + destination); + } + + // Keep the destination. + _pingDestinations.add(destination); + } + } + + /** + * Creates consumers for the specified destinations and registers this pinger to listen to their messages. + * + * @param destinations The destinations to listen to. + * @param selector A selector to filter the messages with. + * + * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. + */ + public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException + { + /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations + + ", String selector = " + selector + "): called");*/ + + log.debug("There are " + destinations.size() + " destinations."); + log.debug("Creating " + _noOfConsumers + " consumers on each destination."); + log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); + + for (Destination destination : destinations) + { + _consumer = new MessageConsumer[_noOfConsumers]; + + // If we don't have consumers then ensure we have created the + // destination. + if (_noOfConsumers == 0) + { + _producerSession.createConsumer(destination, selector, + NO_LOCAL_DEFAULT).close(); + } + + for (int i = 0; i < _noOfConsumers; i++) + { + // Create a consumer for the destination and set this pinger to listen to its messages. + _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); + + final int consumerNo = i; + + _consumer[i].setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + onMessageWithConsumerNo(message, consumerNo); + } + }); + + log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); + } + } + } + + /** + * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a + * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the + * replies map. + * + * @param message The received message. + * @param consumerNo The consumer number within this test pinger instance. + */ + public void onMessageWithConsumerNo(Message message, int consumerNo) + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); + try + { + long now = System.nanoTime(); + long timestamp = getTimestamp(message); + long pingTime = now - timestamp; + + // NDC.push("id" + instanceId + "/cons" + consumerNo); + + // Extract the messages correlation id. + String correlationID = message.getJMSCorrelationID(); + // log.debug("correlationID = " + correlationID); + + // int num = message.getIntProperty("MSG_NUM"); + // log.info("Message " + num + " received."); + + boolean isRedelivered = message.getJMSRedelivered(); + // log.debug("isRedelivered = " + isRedelivered); + + if (!isRedelivered) + { + // Countdown on the traffic light if there is one for the matching correlation id. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + + if (perCorrelationId != null) + { + CountDownLatch trafficLight = perCorrelationId.trafficLight; + + // Restart the timeout timer on every message. + perCorrelationId.timeOutStart = System.nanoTime(); + + // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + + // Release waiting senders if there are some and using maxPending limit. + if ((_maxPendingSize > 0)) + { + // Decrement the count of sent but not yet received messages. + int unreceived = _unreceived.decrementAndGet(); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + + // synchronized (_sendPauseMonitor) + // { + if (unreceivedSize < _maxPendingSize) + { + _sendPauseMonitor.poll(); + } + // } + } + + // Decrement the countdown latch. Before this point, it is possible that two threads might enter this + // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block + // ensures that each thread will get a unique value for the remaining messages. + long trueCount; + long remainingCount; + + synchronized (trafficLight) + { + trafficLight.countDown(); + + trueCount = trafficLight.getCount(); + remainingCount = trueCount - 1; + + // NDC.push("/rem" + remainingCount); + + // log.debug("remainingCount = " + remainingCount); + // log.debug("trueCount = " + trueCount); + + // Commit on transaction batch size boundaries. At this point in time the waiting producer + // remains blocked, even on the last message. + // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on + // each batch boundary. For pub/sub each consumer gets every message so no division is done. + // When running in client ack mode, an ack is done instead of a commit, on the commit batch + // size boundaries. + long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // _noOfConsumers can be set to 0 on the command line but we will not get here to + // divide by 0 as this is executed by the onMessage code when a message is recevied. + // no consumers means no message reception. + + + // log.debug("commitCount = " + commitCount); + + if ((commitCount % _txBatchSize) == 0) + { + if (_consAckMode == 2) + { + // log.debug("Doing client ack for consumer " + consumerNo + "."); + message.acknowledge(); + } + else + { + // log.debug("Trying commit for consumer " + consumerNo + "."); + commitTx(_consumerSession[consumerNo]); + // log.info("Tx committed on consumer " + consumerNo); + } + } + + // Forward the message and remaining count to any interested chained message listener. + if (_chainedMessageListener != null) + { + _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); + } + + // Check if this is the last message, in which case release any waiting producers. This is done + // after the transaction has been committed and any listeners notified. + if (trueCount == 1) + { + trafficLight.countDown(); + } + } + } + else + { + log.warn(consumerNo + " Got unexpected message with correlationId: " + correlationID); + log.warn(consumerNo + " Map contains:" + perCorrelationIds.entrySet()); + } + } + else + { + log.warn("Got redelivered message, ignoring."); + } + } + catch (JMSException e) + { + log.warn("There was a JMSException: " + e.getMessage(), e); + } + finally + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); + // NDC.clear(); + } + } + + public void setupCorrelationID(String correlationId, int expectedCount) + { + PerCorrelationId perCorrelationId = new PerCorrelationId(); + + // Create a count down latch to count the number of replies with. This is created before the messages are + // sent so that the replies cannot be received before the count down is created. + // One is added to this, so that the last reply becomes a special case. The special case is that the + // chained message listener must be called before this sender can be unblocked, but that decrementing the + // countdown needs to be done before the chained listener can be called. + perCorrelationId.trafficLight = new CountDownLatch(expectedCount + 1); + + perCorrelationIds.put(correlationId, perCorrelationId); + } + + + /** + * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out + * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify + * the correlation id. + * + * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination + * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings. + * + * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur. + * + * @param message The message to send. If this is null, one is generated. + * @param numPings The number of ping messages to send. + * @param timeout The timeout in milliseconds. + * @param messageCorrelationId The message correlation id. If this is null, one is generated. + * + * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait + * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws InterruptedException When interrupted by a timeout + */ + public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId); + } + + public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + + int totalPingsRequested = numPings + preFill; + + // Generate a unique correlation id to put on the messages before sending them, if one was not specified. + if (messageCorrelationId == null) + { + messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); + + setupCorrelationID(messageCorrelationId, getExpectedNumPings(totalPingsRequested)); + } + + try + { + // NDC.push("prod"); + + PerCorrelationId perCorrelationId = perCorrelationIds.get(messageCorrelationId); + + // Set up the current time as the start time for pinging on the correlation id. This is used to determine + // timeouts. + perCorrelationId.timeOutStart = System.nanoTime(); + + // Send the specifed number of messages for this test + pingNoWaitForReply(message, numPings, messageCorrelationId); + + boolean timedOut; + boolean allMessagesReceived; + int numReplies; + + // We don't have a consumer so don't try and wait for the messages. + // this does mean that if the producerSession is !TXed then we may + // get to exit before all msgs have been received. + // + // Return the number of requested messages, this will let the test + // report a pass. + if (_noOfConsumers == 0 || _sendOnly) + { + return getExpectedNumPings(totalPingsRequested); + } + + do + { + // Block the current thread until replies to all the messages are received, or it times out. + perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); + + // Work out how many replies were receieved. + numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount(); + + allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested); + + // log.debug("numReplies = " + numReplies); + // log.debug("allMessagesReceived = " + allMessagesReceived); + + // Recheck the timeout condition. + long now = System.nanoTime(); + long lastMessageReceievedAt = perCorrelationId.timeOutStart; + timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); + + // log.debug("now = " + now); + // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); + } + while (!timedOut && !allMessagesReceived); + + if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose) + { + log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); + } + else if (_verbose) + { + log.info("Got all replies on id, " + messageCorrelationId); + } + + // commitTx(_consumerSession); + + // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + + return numReplies; + } + // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, + // so will be a memory leak if this is not done. + finally + { + // NDC.pop(); + perCorrelationIds.remove(messageCorrelationId); + } + } + + /** + * Sends the specified number of ping messages and does not wait for correlating replies. + * + * @param message The message to send. + * @param numPings The number of pings to send. + * @param messageCorrelationId A correlation id to place on all messages sent. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException + { + /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + + // If we are runnning a consumeOnly test then don't send any messages + if (_consumeOnly) + { + return; + } + + if (message == null) + { + message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + } + + message.setJMSCorrelationID(messageCorrelationId); + + // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the + // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is + // needed. + boolean committed = false; + + // Send all of the ping messages. + for (int i = 0; i < numPings; i++) + { + // Re-timestamp the message. + // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + + // Send the message, passing in the message count. + committed = sendMessage(i, message); + + // Spew out per message timings on every message sonly in verbose mode. + /*if (_verbose) + { + log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); + }*/ + } + + // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. + if (!committed) + { + commitTx(_producerSession); + } + } + + /** + * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of + * messages sent so far must be specified and is used to round robin the ping destinations (where there are more + * than one), and to determine if the transaction batch size has been reached and the sent messages should be + * committed. + * + * @param i The count of messages sent so far in a loop of multiple calls to this send method. + * @param message The message to send. + * + * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise. + * + * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. + */ + protected boolean sendMessage(int i, Message message) throws JMSException + { + try + { + NDC.push("id" + instanceId + "/prod"); + + // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); + // log.debug("_txBatchSize = " + _txBatchSize); + + // Round robin the destinations as the messages are sent. + Destination destination = _pingDestinations.get(i % _pingDestinations.size()); + + // Prompt the user to kill the broker when doing failover testing. + _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); + + // Get the test setup for the correlation id. + String correlationID = message.getJMSCorrelationID(); + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + + // If necessary, wait until the max pending message size comes within its limit. + if (_maxPendingSize > 0) + { + synchronized (_sendPauseMonitor) + { + // Used to keep track of the number of times that send has to wait. + int numWaits = 0; + + // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with + // the test timeout. + int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); + + while (true) + { + // Get the size estimate of sent but not yet received messages. + int unreceived = _unreceived.get(); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + // log.debug("_maxPendingSize = " + _maxPendingSize); + + if (unreceivedSize > _maxPendingSize) + { + // log.debug("unreceived size estimate over limit = " + unreceivedSize); + + // Fail the test if the send has had to wait more than the maximum allowed number of times. + if (numWaits > waitLimit) + { + String errorMessage = + "Send has had to wait for the unreceivedSize (" + unreceivedSize + + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit + + " times."; + log.warn(errorMessage); + throw new RuntimeException(errorMessage); + } + + // Wait on the send pause barrier for the limit to be re-established. + try + { + long start = System.nanoTime(); + // _sendPauseMonitor.wait(10000); + _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); + long end = System.nanoTime(); + + // Count the wait only if it was for > 99% of the requested wait time. + if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) + { + numWaits++; + } + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + else + { + break; + } + } + } + } + + // Send the message either to its round robin destination, or its default destination. + // int num = numSent.incrementAndGet(); + // message.setIntProperty("MSG_NUM", num); + setTimestamp(message); + + if (destination == null) + { + _producer.send(message); + } + else + { + _producer.send(destination, message); + } + + // Increase the unreceived size, this may actually happen after the message is received. + // The unreceived size is incremented by the number of consumers that will get a copy of the message, + // in pub/sub mode. + if (_maxPendingSize > 0) + { + int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + // log.debug("newUnreceivedCount = " + newUnreceivedCount); + } + + // Apply message rate throttling if a rate limit has been set up. + if (_rateLimiter != null) + { + _rateLimiter.throttle(); + } + + // Call commit every time the commit batch size is reached. + boolean committed = false; + + // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. + if (((i + 1) % _txBatchSize) == 0) + { + // log.debug("Trying commit on producer session."); + committed = commitTx(_producerSession); + } + + return committed; + } + finally + { + NDC.clear(); + } + } + + /** + * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the + * test that the failure has occurred, before the method returns. + * + * @param failFlag The fail flag to test. + * + * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only + * used once, then reset. + */ + private boolean waitForUserToPromptOnFailure(boolean failFlag) + { + if (failFlag) + { + if (_failOnce) + { + failFlag = false; + } + + // log.debug("Failing Before Send"); + waitForUser(KILL_BROKER_PROMPT); + } + + return failFlag; + } + + /** + * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch + * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will + * terminate the pinger. + */ + public void pingLoop() + { + try + { + // Generate a sample message and time stamp it. + Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); + // setTimestamp(msg); + + // Send the message and wait for a reply. + pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); + } + catch (JMSException e) + { + _publish = false; + // log.debug("There was a JMSException: " + e.getMessage(), e); + } + catch (InterruptedException e) + { + _publish = false; + // log.debug("There was an interruption: " + e.getMessage(), e); + } + } + + /** + * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set + * here. + * + * @param messageListener The chained message listener. + */ + public void setChainedMessageListener(ChainedMessageListener messageListener) + { + _chainedMessageListener = messageListener; + } + + /** Removes any chained message listeners from this pinger. */ + public void removeChainedMessageListener() + { + _chainedMessageListener = null; + } + + /** + * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. + * + * @param replyQueue The reply-to destination for the message. + * @param messageSize The desired size of the message in bytes. + * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise. + * + * @return A freshly generated test message. + * + * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. + */ + public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException + { + // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); + return TestUtils.createTestMessageOfSize(_producerSession, messageSize); + } + + /** + * Sets the current time in nanoseconds as the timestamp on the message. + * + * @param msg The message to timestamp. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + protected void setTimestamp(Message msg) throws JMSException + { + /*if (((AMQSession)_producerSession).isStrictAMQP()) + { + ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); + } + else + {*/ + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + // } + } + + /** + * Extracts the nanosecond timestamp from a message. + * + * @param msg The message to extract the time stamp from. + * + * @return The timestamp in nanos. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + protected long getTimestamp(Message msg) throws JMSException + { + /*if (((AMQSession)_producerSession).isStrictAMQP()) + { + Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); + + return (value == null) ? 0L : value; + } + else + {*/ + return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + // } + } + + /** + * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag + * has been cleared. + */ + public void stop() + { + _publish = false; + } + + /** + * Starts the producer and consumer connections. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void start() throws JMSException + { + // log.debug("public void start(): called"); + + _connection.start(); + // log.debug("Producer started."); + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i].start(); + // log.debug("Consumer " + i + " started."); + } + } + + /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ + public void run() + { + // Keep running until the publish flag is cleared. + while (_publish) + { + pingLoop(); + } + } + + /** + * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the + * connection, this clears the publish flag which in turn will halt the ping loop. + * + * @param e The exception that triggered this callback method. + */ + public void onException(JMSException e) + { + // log.debug("public void onException(JMSException e = " + e + "): called", e); + _publish = false; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered + * with the runtime system as a shutdown hook. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + } + }); + } + + /** + * Closes all of the producer and consumer connections. + * + * @throws JMSException All JMSException are allowed to fall through. + */ + public void close() throws JMSException + { + // log.debug("public void close(): called"); + + try + { + if (_connection != null) + { + // log.debug("Before close producer connection."); + _connection.close(); + // log.debug("Closed producer connection."); + } + + for (int i = 0; i < _noOfConsumers; i++) + { + if (_consumerConnection[i] != null) + { + // log.debug("Before close consumer connection " + i + "."); + _consumerConnection[i].close(); + // log.debug("Closed consumer connection " + i + "."); + } + } + } + finally + { + _connection = null; + _producerSession = null; + _consumerSession = null; + _consumerConnection = null; + _producer = null; + _consumer = null; + _pingDestinations = null; + _replyDestination = null; + } + } + + /** + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a + * transactional controlSession, this method does nothing (unless the failover after send flag is set). + * + * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is + * applied. This flag applies whether the pinger is transactional or not. + * + * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit + * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the + * commit is applied. These flags will only apply if using a transactional pinger. + * + * @param session The controlSession to commit + * + * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * + * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional and + * non-transactional alike. + */ + protected boolean commitTx(Session session) throws JMSException + { + // log.debug("protected void commitTx(Session session): called"); + + boolean committed = false; + + _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); + + if (session.getTransacted()) + { + // log.debug("Session is transacted."); + + try + { + _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); + + long start = System.nanoTime(); + session.commit(); + committed = true; + // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); + + _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); + + // log.debug("Session Commited."); + } + catch (JMSException e) + { + // log.debug("JMSException on commit:" + e.getMessage(), e); + + try + { + session.rollback(); + // log.debug("Message rolled back."); + } + catch (JMSException jmse) + { + // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } + } + + return committed; + } + + /** + * Outputs a prompt to the console and waits for the user to press return. + * + * @param prompt The prompt to display on the console. + */ + public void waitForUser(String prompt) + { + System.out.println(prompt); + + try + { + System.in.read(); + } + catch (IOException e) + { + // Ignored. + } + + System.out.println("Continuing."); + } + + /** + * Gets the number of consumers that are listening to each destination in the test. + * + * @return int The number of consumers subscribing to each topic. + */ + public int getConsumersPerDestination() + { + return _noOfConsumers; + } + + /** + * Calculates how many pings are expected to be received for the given number sent. + * + * Note : that if you have set noConsumers to 0 then this will also return 0 + * in the case of PubSub testing. This is correct as without consumers there + * will be no-one to receive the sent messages so they will be unable to respond. + * + * @param numpings The number of pings that will be sent. + * + * @return The number that should be received, for the test to pass. + */ + public int getExpectedNumPings(int numpings) + { + // Wow, I'm freaking sorry about this return here... + return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) * + (_isPubSub ? getConsumersPerDestination() : 1); + } + + /** + * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link + * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link + * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of + * messages with that correlation id. + * + * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be + * given unique message counts. It will always be called while the producer waiting for all messages to arrive is + * still blocked. + */ + public static interface ChainedMessageListener + { + /** + * Notifies interested listeners about message arrival and important test stats, the number of messages + * remaining in the test, and the messages send timestamp. + * + * @param message The newly arrived message. + * @param remainingCount The number of messages left to complete the test. + * @param latency The nanosecond latency of the message. + * + * @throws JMSException Any JMS exceptions is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException; + } + + /** + * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be + * added to this: read/write lock to make onMessage more concurrent as described in class header comment. + */ + protected static class PerCorrelationId + { + /** Holds a countdown on number of expected messages. */ + CountDownLatch trafficLight; + + /** Holds the last timestamp that the timeout was reset to. */ + Long timeOutStart; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java new file mode 100644 index 0000000000..009254c612 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -0,0 +1,251 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.junit.extensions.AsymptoticTestCase; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run + * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from + * a producer to a conumer, then the consumer replies to the message on a temporary queue. + * + * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number + * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled + * up using a suitable JUnit test runner. See {@link org.apache.qpid.junit.extensions.TKTestRunner} for more + * information on how to do this. + * + * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads + * gets its own connection/producer/consumer, this is only re-established if the connection is lost. + * + * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come + * back on the temporary queue. + * + * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +public class PingPongTestPerf extends AsymptoticTestCase +{ + private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>(); + + // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in + // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner + // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. + // private Properties testParameters = System.getProperties(); + private ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingPongTestPerf(String name) + { + super(name); + + _logger.debug(testParameters); + + // Sets up the test parameters with defaults. + /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); + testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.PING_QUEUE_NAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, + Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.FAIL_AFTER_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, + Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, + Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, + PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingPongTestPerf("testPingPongOk")); + + return suite; + } + + private static void setSystemPropertyIfNull(String propName, String propValue) + { + if (System.getProperty(propName) == null) + { + System.setProperty(propName, propValue); + } + } + + public void testPingPongOk(int numPings) throws Exception + { + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the message and wait for a reply. + int numReplies = + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != numPings) + { + Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); + + synchronized (this) + { + // Establish a bounce back client on the ping queue to bounce back the pings. + perThreadSetup._testPingBouncer = + new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, + transacted, selector, verbose, pubsub); + + // Start the connections for client and producer running. + perThreadSetup._testPingBouncer.getConnection().start(); + + // Establish a ping-pong client on the ping queue to send the pings and receive replies with. + perThreadSetup._testPingProducer = new PingPongProducer(testParameters); + perThreadSetup._testPingProducer.establishConnection(true, true); + perThreadSetup._testPingProducer.start(); + } + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + perThreadSetup._testPingProducer.close(); + // perThreadSetup._testPingBouncer.close(); + } + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping-pong producer. + */ + private PingPongProducer _testPingProducer; + + /** + * Holds the test ping client. + */ + private PingPongBouncer _testPingBouncer; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java new file mode 100644 index 0000000000..e2e97ab6f8 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java @@ -0,0 +1,207 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.testcases; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.Circuit; +import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.sequencers.CircuitFactory; + +import org.apache.qpid.junit.extensions.TestThreadAware; +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import java.util.LinkedList; + +/** + * MessageThroughputPerf runs a test over a {@link Circuit} controlled by the test parameters. It logs timings of + * the time required to receive samples consisting of batches of messages. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Measure message throughput accross a test circuit. <td> {@link Circuit} + * </table> + * + * @todo Check that all of the messages were sent. Check that the receiving end got the same number of messages as + * the publishing end. + * + * @todo Set this up to run with zero sized tests. Size zero means send forever. Continuous sending to be interrupted + * by completion of the test duration, or shutdown hook when the user presses Ctrl-C. + */ +public class MessageThroughputPerf extends FrameworkBaseCase implements TimingControllerAware, TestThreadAware +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(MessageThroughputPerf.class); + + /** Holds the timing controller, used to log test timings from self-timed tests. */ + private TimingController timingController; + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>(); + + /** + * Creates a new test case with the specified name. + * + * @param name The test case name. + */ + public MessageThroughputPerf(String name) + { + super(name); + } + + /** + * Performs the a basic P2P test case. + * + * @param numMessages The number of messages to send in the test. + */ + public void testThroughput(int numMessages) + { + log.debug("public void testThroughput(): called"); + + PerThreadSetup setup = threadSetup.get(); + assertNoFailures(setup.testCircuit.test(numMessages, new LinkedList<Assertion>())); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as known to the test + * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + * name "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + log.debug("public String getTestCaseNameForTestMethod(String methodName = " + methodName + "): called"); + + return "DEFAULT_CIRCUIT_TEST"; + } + + /** + * Used by test runners that can supply a {@link org.apache.qpid.junit.extensions.TimingController} to set the + * controller on an aware test. + * + * @param controller The timing controller. + */ + public void setTimingController(TimingController controller) + { + timingController = controller; + } + + /** + * Overrides the parent setUp method so that the in-vm broker creation is not done on a per test basis. + * + * @throws Exception Any exceptions allowed to fall through and fail the test. + */ + protected void setUp() throws Exception + { + NDC.push(getName()); + + setTestProps(TestContextProperties.getInstance(MessagingTestConfigProperties.defaults)); + } + + /** + * Overrides the parent setUp method so that the in-vm broker clean-up is not done on a per test basis. + */ + protected void tearDown() + { + NDC.pop(); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + // Run the test setup tasks. This may create an in-vm broker, if a decorator has injected a task for this. + getTaskHandler().runSetupTasks(); + + // Get the test parameters, any overrides on the command line will have been applied. + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + // Customize the test parameters. + testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST"); + testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue"); + + // Get the test circuit factory to create test circuits and run the standard test procedure through. + CircuitFactory circuitFactory = getCircuitFactory(); + + // Create the test circuit. This projects the circuit onto the available test nodes and connects it up. + Circuit testCircuit = circuitFactory.createCircuit(null, testProps); + + // Store the test configuration for the thread. + PerThreadSetup setup = new PerThreadSetup(); + setup.testCircuit = testCircuit; + threadSetup.set(setup); + } + + /** + * Called after all threads have completed their setup. + */ + public void postThreadSetUp() + { + //Nothing to do here, potentially implement preFill as per PingTestPerf. + } + + /** + * Called when a test thread is destroyed. + */ + public void threadTearDown() + { + // Run the test teardown tasks. This may destroy the in-vm broker, if a decorator has injected a task for this. + getTaskHandler().runSetupTasks(); + } + + /** + * Holds the per-thread test configurations. + */ + protected static class PerThreadSetup + { + /** Holds the test circuit to run tests on. */ + Circuit testCircuit; + } + + /** + * Compiles all the tests in this class into a suite. + * + * @return The test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Qpid Throughput Performance Tests"); + + suite.addTest(new MessageThroughputPerf("testThroughput")); + + return suite; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java new file mode 100644 index 0000000000..d5c0979399 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -0,0 +1,326 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.config.Connector; +import org.apache.qpid.config.AbstractConfig; + +import javax.jms.Connection; + +public class Config extends AbstractConfig implements ConnectorConfig +{ + + private String host = "localhost"; + private int port = 5672; + private String factory = null; + + private int payload = 256; + private int messages = 1000; + private int clients = 1; + private int batch = 1; + private long delay = 1; + private int warmup; + private int ackMode= AMQSession.NO_ACKNOWLEDGE; + private String clientId; + private String subscriptionId; + private String selector; + private String destinationName; + private boolean persistent; + private boolean transacted; + private int destinationsCount; + private int batchSize; + private int rate; + private boolean ispubsub; + private long timeout; + + public Config() + { + } + + public int getAckMode() + { + return ackMode; + } + + public void setPayload(int payload) + { + this.payload = payload; + } + + public int getPayload() + { + return payload; + } + + void setClients(int clients) + { + this.clients = clients; + } + + int getClients() + { + return clients; + } + + void setMessages(int messages) + { + this.messages = messages; + } + + public int getMessages() + { + return messages; + } + + public int getBatchSize() + { + return batchSize; + } + + public int getRate() + { + return rate; + } + + public int getDestinationsCount() + { + return destinationsCount; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public String getFactory() + { + return factory; + } + + public void setPort(int port) + { + this.port = port; + } + + int getBatch() + { + return batch; + } + + void setBatch(int batch) + { + this.batch = batch; + } + + int getWarmup() + { + return warmup; + } + + void setWarmup(int warmup) + { + this.warmup = warmup; + } + + public long getDelay() + { + return delay; + } + + public void setDelay(long delay) + { + this.delay = delay; + } + + public long getTimeout() + { + return timeout; + } + + public void setTimeout(long time) + { + this.timeout = time; + } + + public String getClientId() + { + return clientId; + } + + public String getSubscriptionId() + { + return subscriptionId; + } + + public String getSelector() + { + return selector; + } + + public String getDestination() + { + return destinationName; + } + + public boolean usePersistentMessages() + { + return persistent; + } + + public boolean isTransacted() + { + return transacted; + } + + public boolean isPubSub() + { + return ispubsub; + } + + public void setOption(String key, String value) + { + if("-host".equalsIgnoreCase(key)) + { + setHost(value); + } + else if("-port".equalsIgnoreCase(key)) + { + try + { + setPort(Integer.parseInt(value)); + } + catch(NumberFormatException e) + { + throw new RuntimeException("Bad port number: " + value, e); + } + } + else if("-payload".equalsIgnoreCase(key)) + { + setPayload(parseInt("Bad payload size", value)); + } + else if("-messages".equalsIgnoreCase(key)) + { + setMessages(parseInt("Bad message count", value)); + } + else if("-clients".equalsIgnoreCase(key)) + { + setClients(parseInt("Bad client count", value)); + } + else if("-batch".equalsIgnoreCase(key)) + { + setBatch(parseInt("Bad batch count", value)); + } + else if("-delay".equalsIgnoreCase(key)) + { + setDelay(parseLong("Bad batch delay", value)); + } + else if("-warmup".equalsIgnoreCase(key)) + { + setWarmup(parseInt("Bad warmup count", value)); + } + else if("-ack".equalsIgnoreCase(key)) + { + ackMode = parseInt("Bad ack mode", value); + } + else if("-factory".equalsIgnoreCase(key)) + { + factory = value; + } + else if("-clientId".equalsIgnoreCase(key)) + { + clientId = value; + } + else if("-subscriptionId".equalsIgnoreCase(key)) + { + subscriptionId = value; + } + else if("-persistent".equalsIgnoreCase(key)) + { + persistent = "true".equalsIgnoreCase(value); + } + else if("-transacted".equalsIgnoreCase(key)) + { + transacted = "true".equalsIgnoreCase(value); + } + else if ("-destinationscount".equalsIgnoreCase(key)) + { + destinationsCount = parseInt("Bad destinations count", value); + } + else if ("-batchsize".equalsIgnoreCase(key)) + { + batchSize = parseInt("Bad batch size", value); + } + else if ("-rate".equalsIgnoreCase(key)) + { + rate = parseInt("MEssage rate", value); + } + else if("-pubsub".equalsIgnoreCase(key)) + { + ispubsub = "true".equalsIgnoreCase(value); + } + else if("-selector".equalsIgnoreCase(key)) + { + selector = value; + } + else if("-destinationname".equalsIgnoreCase(key)) + { + destinationName = value; + } + else if("-timeout".equalsIgnoreCase(key)) + { + setTimeout(parseLong("Bad timeout data", value)); + } + else + { + System.out.println("Ignoring unrecognised option: " + key); + } + } + + static String getAckModeDescription(int ackMode) + { + switch(ackMode) + { + case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE"; + case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE"; + case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE"; + case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE"; + case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE"; + } + return "AckMode=" + ackMode; + } + + public Connection createConnection() throws Exception + { + return new Connector().createConnection(this); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java new file mode 100644 index 0000000000..6dcea42bfe --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java @@ -0,0 +1,303 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import java.util.Random; + +import javax.jms.*; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for + * cross testing the java and cpp clients. + * + * <p/>How the cpp topic_publisher operates: + * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for + * the specified number of test messages to be sent. + * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST", + * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The + * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message + * about the number of messages received and how long it took, although the publisher never looks at the message content. + * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST", + * which the listener should close its connection and terminate upon receipt of. + * + * @todo I've added lots of field table types in the report message, just to check if the other end can decode them + * correctly. Not really the right place to test this, so remove them from + * {@link #createReportResponseMessage(String)} once a better test exists. + */ +public class Listener implements MessageListener +{ + private static Logger log = Logger.getLogger(Listener.class); + + public static final String CONTROL_TOPIC = "topic_control"; + public static final String RESPONSE_QUEUE = "response"; + + private final Topic _topic; + //private final Topic _control; + + private final Queue _response; + + /** Holds the connection to listen on. */ + private final Connection _connection; + + /** Holds the producer to send control messages on. */ + private final MessageProducer _controller; + + /** Holds the JMS session. */ + private final javax.jms.Session _session; + + /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ + private boolean init; + + /** Holds the count of messages received by this listener. */ + private int count; + + /** Used to hold the start time of the first message. */ + private long start; + private static String clientId; + + Listener(Connection connection, int ackMode, String name) throws Exception + { + log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name + + "): called"); + + _connection = connection; + _session = connection.createSession(false, ackMode); + + if (_session instanceof AMQSession) + { + _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, CONTROL_TOPIC); + //_control = new AMQTopic(CONTROL_TOPIC); + _response = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, RESPONSE_QUEUE); + } + else + { + _topic = _session.createTopic(CONTROL_TOPIC); + //_control = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); + } + + //register for events + if (name == null) + { + log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)"); + createTopicConsumer().setMessageListener(this); + } + else + { + log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)"); + createDurableTopicConsumer(name).setMessageListener(this); + } + + _connection.start(); + + _controller = createControlPublisher(); + System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode) + + + ((name == null) + ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")")) + + "..."); + } + + public static void main(String[] argv) throws Exception + { + clientId = "Listener-" + System.currentTimeMillis(); + + NDC.push(clientId); + + Config config = new Config(); + config.setOptions(argv); + + //Connection con = config.createConnection(); + Connection con = + new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort() + + "'"); + + if (config.getClientId() != null) + { + con.setClientID(config.getClientId()); + } + + new Listener(con, config.getAckMode(), config.getSubscriptionId()); + + NDC.pop(); + NDC.remove(); + } + + /** + * Checks whether or not a text field on a message has the specified value. + * + * @param m The message to check. + * @param fieldName The name of the field to check. + * @param value The expected value of the field to compare with. + * + * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException + { + log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + + ", String value = " + value + "): called"); + + String comp = m.getStringProperty(fieldName); + log.debug("comp = " + comp); + + boolean result = (comp != null) && comp.equals(value); + log.debug("result = " + result); + + return result; + } + + public void onMessage(Message message) + { + NDC.push(clientId); + + log.debug("public void onMessage(Message message = " + message + "): called"); + + if (!init) + { + start = System.nanoTime() / 1000000; + count = 0; + init = true; + } + + try + { + if (isShutdown(message)) + { + log.debug("Got a shutdown message."); + shutdown(); + } + else if (isReport(message)) + { + log.debug("Got a report request message."); + + // Send the report. + report(); + init = false; + } + } + catch (JMSException e) + { + log.warn("There was a JMSException during onMessage.", e); + } + finally + { + NDC.pop(); + } + } + + Message createReportResponseMessage(String msg) throws JMSException + { + Message message = _session.createTextMessage(msg); + + // Shove some more field table type in the message just to see if the other end can handle it. + message.setBooleanProperty("BOOLEAN", true); + message.setByteProperty("BYTE", (byte) 5); + message.setDoubleProperty("DOUBLE", Math.PI); + message.setFloatProperty("FLOAT", 1.0f); + message.setIntProperty("INT", 1); + message.setShortProperty("SHORT", (short) 1); + message.setLongProperty("LONG", (long) 1827361278); + message.setStringProperty("STRING", "hello"); + + return message; + } + + boolean isShutdown(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); + + //log.debug("isShutdown = " + result); + + return result; + } + + boolean isReport(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); + + //log.debug("isReport = " + result); + + return result; + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_response); + } + + private void shutdown() + { + try + { + _session.close(); + _connection.stop(); + _connection.close(); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private void report() + { + log.debug("private void report(): called"); + + try + { + String msg = getReport(); + _controller.send(createReportResponseMessage(msg)); + log.debug("Sent report: " + msg); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private String getReport() + { + long time = ((System.nanoTime() / 1000000) - start); + + return "Received " + count + " in " + time + "ms"; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java new file mode 100644 index 0000000000..4efdc1cb56 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import javax.jms.*; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + */ +class MessageFactory +{ + private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + + private final Session _session; + private final Topic _topic; + private final Topic _control; + private final byte[] _payload; + + MessageFactory(Session session) throws JMSException + { + this(session, 256); + } + + MessageFactory(Session session, int size) throws JMSException + { + _session = session; + if (session instanceof AMQSession) + { + _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topic_control"); + _control = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topictest.control"); + } + else + { + _topic = session.createTopic("topic_control"); + _control = session.createTopic("topictest.control"); + } + + _payload = new byte[size]; + + for (int i = 0; i < size; i++) + { + _payload[i] = (byte) DATA[i % DATA.length]; + } + } + + private static boolean checkText(Message m, String s) + { + try + { + return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return false; + } + } + + Topic getTopic() + { + return _topic; + } + + Message createEventMessage() throws JMSException + { + BytesMessage msg = _session.createBytesMessage(); + msg.writeBytes(_payload); + + return msg; + } + + Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return e.toString(); + } + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageConsumer createControlConsumer() throws Exception + { + return _session.createConsumer(_control); + } + + MessageProducer createTopicPublisher() throws Exception + { + return _session.createProducer(_topic); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_control); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java new file mode 100644 index 0000000000..c3b19b558a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import javax.jms.*; + +public class Publisher implements MessageListener +{ + private final Object _lock = new Object(); + private final Connection _connection; + private final Session _session; + private final MessageFactory _factory; + private final MessageProducer _publisher; + private int _count; + + Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception + { + _connection = connection; + _session = _connection.createSession(false, ackMode); + _factory = new MessageFactory(_session, size); + _publisher = _factory.createTopicPublisher(); + _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + "."); + } + + private void test(Config config) throws Exception + { + test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup()); + } + + private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception + { + _factory.createControlConsumer().setMessageListener(this); + _connection.start(); + + if (warmup > 0) + { + System.out.println("Runing warmup (" + warmup + " msgs)"); + long time = batch(warmup, consumerCount); + System.out.println("Warmup completed in " + time + "ms"); + } + + long[] times = new long[batches]; + for (int i = 0; i < batches; i++) + { + if (i > 0) + { + Thread.sleep(delay * 1000); + } + times[i] = batch(msgCount, consumerCount); + System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms."); + } + + long min = min(times); + long max = max(times); + System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); + + //request shutdown + _publisher.send(_factory.createShutdownMessage()); + + _connection.stop(); + _connection.close(); + } + + private long batch(int msgCount, int consumerCount) throws Exception + { + _count = consumerCount; + long start = System.currentTimeMillis(); + publish(msgCount); + waitForCompletion(consumerCount); + return System.currentTimeMillis() - start; + } + + private void publish(int count) throws Exception + { + + //send events + for (int i = 0; i < count; i++) + { + _publisher.send(_factory.createEventMessage()); + if ((i + 1) % 100 == 0) + { + System.out.println("Sent " + (i + 1) + " messages"); + } + } + + //request report + _publisher.send(_factory.createReportRequestMessage()); + } + + private void waitForCompletion(int consumers) throws Exception + { + System.out.println("Waiting for completion..."); + synchronized (_lock) + { + while (_count > 0) + { + _lock.wait(); + } + } + } + + + public void onMessage(Message message) + { + System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining"); + if (_count == 0) + { + synchronized (_lock) + { + _lock.notify(); + } + } + } + + static long min(long[] times) + { + long min = times.length > 0 ? times[0] : 0; + for (int i = 0; i < times.length; i++) + { + min = Math.min(min, times[i]); + } + return min; + } + + static long max(long[] times) + { + long max = times.length > 0 ? times[0] : 0; + for (int i = 0; i < times.length; i++) + { + max = Math.max(max, times[i]); + } + return max; + } + + static long avg(long[] times, long min, long max) + { + long sum = 0; + for (int i = 0; i < times.length; i++) + { + sum += times[i]; + } + + int adjustment = 0; + + // Remove min and max if we have run enough batches. + if (times.length > 2) + { + sum -= min; + sum -= max; + adjustment = 2; + } + + return (sum / (times.length - adjustment)); + } + + public static void main(String[] argv) throws Exception + { + Config config = new Config(); + config.setOptions(argv); + + Connection con = config.createConnection(); + int size = config.getPayload(); + int ackMode = config.getAckMode(); + boolean persistent = config.usePersistentMessages(); + new Publisher(con, size, ackMode, persistent).test(config); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java new file mode 100644 index 0000000000..e0c0b00335 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java @@ -0,0 +1,344 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import java.util.ArrayList; +import java.util.HashMap; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.naming.NamingException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class TopicWithSelectorsTransientVolumeTest extends QpidBrokerTestCase +{ + private static final int NUM_MSG_PER_ITERATION = 50;//must be a multiple of 10 + private static final int NUM_ITERATIONS = 1000; + + private static final int NUM_CONSUMERS = 50; + private static final int MSG_SIZE = 1024; + private static final byte[] BYTE_ARRAY = new byte[MSG_SIZE]; + + ArrayList<MyMessageSubscriber> _subscribers = new ArrayList<MyMessageSubscriber>(); + HashMap<String,Long> _queueMsgCounts = new HashMap<String,Long>(); + + private final static Object _lock=new Object(); + private boolean _producerFailed; + private static int _finishedCount; + private static int _failedCount; + + protected void setUp() throws Exception + { + super.setUp(); + init(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + private void init() + { + _finishedCount = 0; + _failedCount = 0; + _producerFailed = false; + _subscribers.clear(); + _queueMsgCounts.clear(); + } + + + private Message createMessage(Session session) throws JMSException + { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(BYTE_ARRAY); + + return message; + } + + /** + * 1 Topic with 50 subscribers using a selector, and 1 producer sending 50,000 1K messages with 90% selector success ratio. + */ + public void test50SubscribersWith90PercentMatched() throws Exception + { + Topic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "test50ConsumersWith10PercentUnmatched"); + + System.out.println("Creating consumers"); + + MyMessageSubscriber sub; + + for(int i=1; i <= NUM_CONSUMERS; i++) + { + sub = new MyMessageSubscriber(topic, "consumer" + i, ((9 * NUM_MSG_PER_ITERATION * NUM_ITERATIONS) / 10)); + _subscribers.add(sub); + } + + System.out.println("Starting consumers"); + for(MyMessageSubscriber s: _subscribers) + { + Thread consumer = new Thread(s); + consumer.start(); + } + + System.out.println("Creating producer"); + MyMessageProducer prod = new MyMessageProducer(topic); + + long startTime = System.currentTimeMillis(); + + System.out.println("Starting producer"); + Thread producer = new Thread(prod); + producer.start(); + + + // Wait for all the messageConsumers to have finished or failed + synchronized (_lock) + { + while (_finishedCount + _failedCount < NUM_CONSUMERS) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + //ignore + } + } + } + + long endTime = System.currentTimeMillis(); + System.out.println("Elapsed time for messaging: " + (endTime-startTime) + "ms"); + + assertFalse("Producer failed to send all messages", _producerFailed); + + //check if all messages received by consumers, or if there were failures + if (_finishedCount != NUM_CONSUMERS) + { + fail(_failedCount + " consumers did not recieve all their expected messages"); + } + + //check if all queue depths were 0 + for(String consumer: _queueMsgCounts.keySet()) + { + long depth = _queueMsgCounts.get(consumer); + assertEquals(consumer + " subscription queue msg count was not 0", 0, depth); + } + + } + + private class MyMessageProducer implements Runnable + { + private TopicConnection _connection; + private TopicSession _session; + private TopicPublisher _messagePublisher; + + public MyMessageProducer(Topic topic) throws JMSException, NamingException + { + _connection = (TopicConnection) getConnection(); + _session = (TopicSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _messagePublisher = _session.createPublisher(topic); + } + + public void run() + { + try + { + for(int iter = 0; iter < NUM_ITERATIONS; iter++) + { + int i = 0; + + //send 90% matching messages + for (; i < (9 * NUM_MSG_PER_ITERATION)/10; i++) + { + Message message = createMessage(_session); + message.setStringProperty("testprop", "true"); + + _messagePublisher.publish(message, DeliveryMode.NON_PERSISTENT, + Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + Thread.yield(); + } + + //send remaining 10% non-matching messages + for (; i < NUM_MSG_PER_ITERATION; i++) + { + Message message = _session.createMessage(); + message.setStringProperty("testprop", "false"); + + _messagePublisher.publish(message, DeliveryMode.NON_PERSISTENT, + Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + Thread.yield(); + } + } + + } + catch (Exception exp) + { + System.out.println("producer: caught an exception, probably exiting before all messages sent"); + exp.printStackTrace(); + synchronized (_lock) + { + _producerFailed=true; + _lock.notifyAll(); + } + } + } + } + + + private class MyMessageSubscriber implements Runnable + { + /* The topic this subscriber is subscribing to */ + private Topic _topic; + private String _consumerName; + private int _outstandingMsgCount; + private TopicConnection _connection; + private TopicSession _session; + private TopicSubscriber _durSub; + + public MyMessageSubscriber(Topic topic, String consumerName, int messageCount) throws JMSException, NamingException + { + _outstandingMsgCount = messageCount; + _topic=topic; + _consumerName = consumerName; + _connection = (TopicConnection) getConnection(); + _session = (TopicSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _durSub = _session.createDurableSubscriber(_topic, _consumerName,"testprop='true'", false); + _connection.start(); + } + + public void run() + { + + boolean failed = false; + do + { + Message m = null; + try + { + m = _durSub.receive(10000); + } + catch (JMSException exp) + { + System.out.println(_consumerName + ": caught an exception handling a received message"); + exp.printStackTrace(); + + failed = true; + break; + } + + Thread.yield(); + + _outstandingMsgCount--; + + if(_outstandingMsgCount % 500 == 0) + { + System.out.println(_consumerName + ": outstanding message count: " + _outstandingMsgCount); + } + + if(m == null) + { + if(_outstandingMsgCount != 0) + { + failed = true; + } + break; + } + } + while(_outstandingMsgCount > 0); + + System.out.println(_consumerName + ": outstanding message count: " + _outstandingMsgCount); + + try + { + AMQQueue subcriptionQueue = new AMQQueue(ExchangeDefaults.TOPIC_EXCHANGE_NAME,"clientid" + ":" + _consumerName); + + ((AMQSession)_session).sync(); + Long depth = ((AMQSession)_session).getQueueDepth(subcriptionQueue); + _queueMsgCounts.put(_consumerName, depth); + + System.out.println(_consumerName + ": completion queue msg count: " + depth); + } + catch (AMQException exp) + { + System.out.println(_consumerName + ": caught an exception determining completion queue depth"); + exp.printStackTrace(); + } + finally + { + try + { + _session.unsubscribe(_consumerName); + } + catch (JMSException e) + { + System.out.println(_consumerName + ": caught an exception whilst unsubscribing"); + e.printStackTrace(); + } + } + + synchronized (_lock) + { + if (_outstandingMsgCount == 0 && !failed) + { + _finishedCount++; + System.out.println(_consumerName + ": finished"); + } + else + { + _failedCount++; + System.out.println(_consumerName + ": failed"); + } + _lock.notifyAll(); + } + + } + } + + //helper method to allow easily running against an external standalone broker +// public static void main(String[] args) throws Exception +// { +// System.setProperty("broker.config", "/dev/null"); +// System.setProperty("broker", "external"); +// System.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); +// System.setProperty("java.naming.provider.url", "test-profiles/test-provider.properties"); +// +// TopicWithSelectorsTransientVolumeTest test = new TopicWithSelectorsTransientVolumeTest(); +// test.init(); +// test.test50SubscribersWith90PercentMatched(); +// test.tearDown(); +// } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/topicselectors.properties b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/topicselectors.properties new file mode 100644 index 0000000000..1f572af58a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/topicselectors.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.default = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'
\ No newline at end of file |