summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-01 14:30:31 +0000
committerRobert Greig <rgreig@apache.org>2007-01-01 14:30:31 +0000
commit438ee20dfe48d89bef82a433d21344cab65380c8 (patch)
tree68f8f427a8b5fe5bcae63d5ab34013b3b173f425
parentefd32a64738195f426b0feeadb93655a80546bbc (diff)
downloadqpid-python-438ee20dfe48d89bef82a433d21344cab65380c8.tar.gz
QPID-232 Added the service request/reply test
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@491577 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xjava/perftests/bin/serviceProvidingClient.sh25
-rwxr-xr-xjava/perftests/bin/serviceRequestingClient.sh27
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java201
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java303
4 files changed, 556 insertions, 0 deletions
diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh
new file mode 100755
index 0000000000..207e4439f1
--- /dev/null
+++ b/java/perftests/bin/serviceProvidingClient.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+# usage: just pass in the host(s)
+$JAVA_HOME/bin/java -cp $CP org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ
diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh
new file mode 100755
index 0000000000..7dd3d63c27
--- /dev/null
+++ b/java/perftests/bin/serviceRequestingClient.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# args supplied: <host:port> <num messages>
+thehosts=$1
+shift
+echo $thehosts
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@"
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
new file mode 100644
index 0000000000..ddee643a76
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ServiceProvidingClient
+{
+ private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class);
+
+ private MessageProducer _destinationProducer;
+
+ private Destination _responseDest;
+
+ private AMQConnection _connection;
+
+ public ServiceProvidingClient(String brokerDetails, String username, String password,
+ String clientName, String virtualPath, String serviceName)
+ throws AMQException, JMSException, URLSyntaxException
+ {
+ _connection = new AMQConnection(brokerDetails, username, password,
+ clientName, virtualPath);
+ _connection.setConnectionListener(new ConnectionListener()
+ {
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback");
+ }
+ });
+ final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _logger.info("Service (queue) name is '" + serviceName + "'...");
+
+ AMQQueue destination = new AMQQueue(serviceName);
+
+ MessageConsumer consumer = session.createConsumer(destination,
+ 100, true, false, null);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _messageCount;
+
+ public void onMessage(Message message)
+ {
+ //_logger.info("Got message '" + message + "'");
+
+ TextMessage tm = (TextMessage) message;
+
+ try
+ {
+ Destination responseDest = tm.getJMSReplyTo();
+ if (responseDest == null)
+ {
+ _logger.info("Producer not created because the response destination is null.");
+ return;
+ }
+
+ if (!responseDest.equals(_responseDest))
+ {
+ _responseDest = responseDest;
+
+ _logger.info("About to create a producer");
+ _destinationProducer = session.createProducer(responseDest);
+ _destinationProducer.setDisableMessageTimestamp(true);
+ _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _logger.info("After create a producer");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error creating destination");
+ }
+ _messageCount++;
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Received message total: " + _messageCount);
+ _logger.info("Sending response to '" + _responseDest + "'");
+ }
+
+ try
+ {
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
+ TextMessage msg = session.createTextMessage(payload);
+ if (tm.propertyExists("timeSent"))
+ {
+ _logger.info("timeSent property set on message");
+ _logger.info("timeSent value is: " + tm.getLongProperty("timeSent"));
+ msg.setStringProperty("timeSent", tm.getStringProperty("timeSent"));
+ }
+ _destinationProducer.send(msg);
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Sent response to '" + _responseDest + "'");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error sending message: " + e, e);
+ }
+ }
+ });
+ }
+
+ public void run() throws JMSException
+ {
+ _connection.start();
+ _logger.info("Waiting...");
+ }
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ if (args.length < 5)
+ {
+ System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]");
+ System.exit(1);
+ }
+ String clientId = null;
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ clientId = address.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+
+ try
+ {
+ ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
+ clientId, args[3], args[4]);
+ client.run();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+
+
+
+ }
+
+}
+
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
new file mode 100644
index 0000000000..b52d06558a
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jms.MessageConsumer;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ *
+ */
+public class ServiceRequestingClient implements ExceptionListener
+{
+ private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
+
+ private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ private String MESSAGE_DATA;
+
+ private AMQConnection _connection;
+
+ private Session _session;
+
+ private long _averageLatency;
+
+ private int _messageCount;
+
+ private volatile boolean _completed;
+
+ private AMQDestination _tempDestination;
+
+ private MessageProducer _producer;
+
+ private Object _waiter;
+
+ private static String createMessagePayload(int size)
+ {
+ _log.info("Message size set to " + size + " bytes");
+ StringBuffer buf = new StringBuffer(size);
+ int count = 0;
+ while (count < size + MESSAGE_DATA_BYTES.length())
+ {
+ buf.append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.length();
+ }
+ if (count < size)
+ {
+ buf.append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.toString();
+ }
+
+ private class CallbackHandler implements MessageListener
+ {
+ private int _expectedMessageCount;
+
+ private int _actualMessageCount;
+
+ private long _startTime;
+
+ public CallbackHandler(int expectedMessageCount, long startTime)
+ {
+ _expectedMessageCount = expectedMessageCount;
+ _startTime = startTime;
+ }
+
+ public void onMessage(Message m)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message received: " + m);
+ }
+ try
+ {
+ m.getPropertyNames();
+ if (m.propertyExists("timeSent"))
+ {
+ long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
+ long now = System.currentTimeMillis();
+ if (_averageLatency == 0)
+ {
+ _averageLatency = now - timeSent;
+ _log.info("Latency " + _averageLatency);
+ }
+ else
+ {
+ _log.info("Individual latency: " + (now - timeSent));
+ _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+ _log.info("Average latency now: " + _averageLatency);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error getting latency data: " + e, e);
+ }
+ _actualMessageCount++;
+ if (_actualMessageCount % 1000 == 0)
+ {
+ _log.info("Received message count: " + _actualMessageCount);
+ }
+
+ if (_actualMessageCount == _expectedMessageCount)
+ {
+ _completed = true;
+ notifyWaiter();
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+ try
+ {
+ _connection.close();
+ _log.info("Connection closed");
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error closing connection");
+ }
+ }
+ }
+ }
+
+ private void notifyWaiter()
+ {
+ if (_waiter != null)
+ {
+ synchronized (_waiter)
+ {
+ _waiter.notify();
+ }
+ }
+ }
+ public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
+ String vpath, String commandQueueName,
+ final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
+ {
+ _messageCount = messageCount;
+ MESSAGE_DATA = createMessagePayload(messageDataLength);
+ try
+ {
+ createConnection(brokerHosts, clientID, username, password, vpath);
+ _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ _connection.setExceptionListener(this);
+
+
+ AMQQueue destination = new AMQQueue(commandQueueName);
+ _producer = (MessageProducer) _session.createProducer(destination);
+ _producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ _tempDestination = new AMQQueue("TempResponse" +
+ Long.toString(System.currentTimeMillis()), true);
+ MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true,
+ true, null);
+
+ //Send first message, then wait a bit to allow the provider to get initialised
+ TextMessage first = _session.createTextMessage(MESSAGE_DATA);
+ first.setJMSReplyTo(_tempDestination);
+ _producer.send(first);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ //now start the clock and the test...
+ final long startTime = System.currentTimeMillis();
+
+ messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ /**
+ * Run the test and notify an object upon receipt of all responses.
+ * @param waiter the object that will be notified
+ * @throws JMSException
+ */
+ public void run(Object waiter) throws JMSException
+ {
+ _waiter = waiter;
+ _connection.start();
+ for (int i = 1; i < _messageCount; i++)
+ {
+ TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+ msg.setJMSReplyTo(_tempDestination);
+ if (i % 1000 == 0)
+ {
+ long timeNow = System.currentTimeMillis();
+ msg.setStringProperty("timeSent", String.valueOf(timeNow));
+ }
+ _producer.send(msg);
+ }
+ _log.info("Finished sending " + _messageCount + " messages");
+ }
+
+ public boolean isCompleted()
+ {
+ return _completed;
+ }
+
+ private void createConnection(String brokerHosts, String clientID, String username, String password,
+ String vpath) throws AMQException, URLSyntaxException
+ {
+ _connection = new AMQConnection(brokerHosts, username, password,
+ clientID, vpath);
+ }
+
+ /**
+ * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+ * means the server will allocate a name.
+ */
+ public static void main(String[] args)
+ {
+ if (args.length < 6)
+ {
+ System.err.println(
+ "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
+ }
+ try
+ {
+ int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;
+
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+ ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
+ args[4], Integer.parseInt(args[5]),
+ messageDataLength);
+ Object waiter = new Object();
+ client.run(waiter);
+ synchronized (waiter)
+ {
+ while (!client.isCompleted())
+ {
+ waiter.wait();
+ }
+ }
+
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (Exception e)
+ {
+ System.err.println("Error in client: " + e);
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ */
+ public void onException(JMSException e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace(System.err);
+ }
+}