diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-01 14:30:31 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-01 14:30:31 +0000 |
commit | 438ee20dfe48d89bef82a433d21344cab65380c8 (patch) | |
tree | 68f8f427a8b5fe5bcae63d5ab34013b3b173f425 | |
parent | efd32a64738195f426b0feeadb93655a80546bbc (diff) | |
download | qpid-python-438ee20dfe48d89bef82a433d21344cab65380c8.tar.gz |
QPID-232 Added the service request/reply test
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@491577 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 556 insertions, 0 deletions
diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh new file mode 100755 index 0000000000..207e4439f1 --- /dev/null +++ b/java/perftests/bin/serviceProvidingClient.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# XXX -Xms1024m -XX:NewSize=300m +. ./setupclasspath.sh +echo $CP +# usage: just pass in the host(s) +$JAVA_HOME/bin/java -cp $CP org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh new file mode 100755 index 0000000000..7dd3d63c27 --- /dev/null +++ b/java/perftests/bin/serviceRequestingClient.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# args supplied: <host:port> <num messages> +thehosts=$1 +shift +echo $thehosts +# XXX -Xms1024m -XX:NewSize=300m +. ./setupclasspath.sh +echo $CP +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@" diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java new file mode 100644 index 0000000000..ddee643a76 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.jms.Session; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.*; +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class ServiceProvidingClient +{ + private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class); + + private MessageProducer _destinationProducer; + + private Destination _responseDest; + + private AMQConnection _connection; + + public ServiceProvidingClient(String brokerDetails, String username, String password, + String clientName, String virtualPath, String serviceName) + throws AMQException, JMSException, URLSyntaxException + { + _connection = new AMQConnection(brokerDetails, username, password, + clientName, virtualPath); + _connection.setConnectionListener(new ConnectionListener() + { + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback"); + } + }); + final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _logger.info("Service (queue) name is '" + serviceName + "'..."); + + AMQQueue destination = new AMQQueue(serviceName); + + MessageConsumer consumer = session.createConsumer(destination, + 100, true, false, null); + + consumer.setMessageListener(new MessageListener() + { + private int _messageCount; + + public void onMessage(Message message) + { + //_logger.info("Got message '" + message + "'"); + + TextMessage tm = (TextMessage) message; + + try + { + Destination responseDest = tm.getJMSReplyTo(); + if (responseDest == null) + { + _logger.info("Producer not created because the response destination is null."); + return; + } + + if (!responseDest.equals(_responseDest)) + { + _responseDest = responseDest; + + _logger.info("About to create a producer"); + _destinationProducer = session.createProducer(responseDest); + _destinationProducer.setDisableMessageTimestamp(true); + _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + _logger.info("After create a producer"); + } + } + catch (JMSException e) + { + _logger.error("Error creating destination"); + } + _messageCount++; + if (_messageCount % 1000 == 0) + { + _logger.info("Received message total: " + _messageCount); + _logger.info("Sending response to '" + _responseDest + "'"); + } + + try + { + String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText(); + TextMessage msg = session.createTextMessage(payload); + if (tm.propertyExists("timeSent")) + { + _logger.info("timeSent property set on message"); + _logger.info("timeSent value is: " + tm.getLongProperty("timeSent")); + msg.setStringProperty("timeSent", tm.getStringProperty("timeSent")); + } + _destinationProducer.send(msg); + if (_messageCount % 1000 == 0) + { + _logger.info("Sent response to '" + _responseDest + "'"); + } + } + catch (JMSException e) + { + _logger.error("Error sending message: " + e, e); + } + } + }); + } + + public void run() throws JMSException + { + _connection.start(); + _logger.info("Waiting..."); + } + + public static void main(String[] args) + { + _logger.info("Starting..."); + + if (args.length < 5) + { + System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]"); + System.exit(1); + } + String clientId = null; + try + { + InetAddress address = InetAddress.getLocalHost(); + clientId = address.getHostName() + System.currentTimeMillis(); + } + catch (UnknownHostException e) + { + _logger.error("Error: " + e, e); + } + + try + { + ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2], + clientId, args[3], args[4]); + client.run(); + } + catch (JMSException e) + { + _logger.error("Error: " + e, e); + } + catch (AMQException e) + { + _logger.error("Error: " + e, e); + } + catch (URLSyntaxException e) + { + _logger.error("Error: " + e, e); + } + + + + } + +} + diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java new file mode 100644 index 0000000000..b52d06558a --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java @@ -0,0 +1,303 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.jms.MessageConsumer; +import org.apache.qpid.jms.MessageProducer; +import org.apache.qpid.jms.Session; + +import javax.jms.*; +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * A client that behaves as follows: + * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li> + * <li>Creates a temporary queue</li> + * <li>Creates messages containing a property that is the name of the temporary queue</li> + * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li> + * </ul> + * + */ +public class ServiceRequestingClient implements ExceptionListener +{ + private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class); + + private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk "; + + private String MESSAGE_DATA; + + private AMQConnection _connection; + + private Session _session; + + private long _averageLatency; + + private int _messageCount; + + private volatile boolean _completed; + + private AMQDestination _tempDestination; + + private MessageProducer _producer; + + private Object _waiter; + + private static String createMessagePayload(int size) + { + _log.info("Message size set to " + size + " bytes"); + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count < size + MESSAGE_DATA_BYTES.length()) + { + buf.append(MESSAGE_DATA_BYTES); + count += MESSAGE_DATA_BYTES.length(); + } + if (count < size) + { + buf.append(MESSAGE_DATA_BYTES, 0, size - count); + } + + return buf.toString(); + } + + private class CallbackHandler implements MessageListener + { + private int _expectedMessageCount; + + private int _actualMessageCount; + + private long _startTime; + + public CallbackHandler(int expectedMessageCount, long startTime) + { + _expectedMessageCount = expectedMessageCount; + _startTime = startTime; + } + + public void onMessage(Message m) + { + if (_log.isDebugEnabled()) + { + _log.debug("Message received: " + m); + } + try + { + m.getPropertyNames(); + if (m.propertyExists("timeSent")) + { + long timeSent = Long.parseLong(m.getStringProperty("timeSent")); + long now = System.currentTimeMillis(); + if (_averageLatency == 0) + { + _averageLatency = now - timeSent; + _log.info("Latency " + _averageLatency); + } + else + { + _log.info("Individual latency: " + (now - timeSent)); + _averageLatency = (_averageLatency + (now - timeSent)) / 2; + _log.info("Average latency now: " + _averageLatency); + } + } + } + catch (JMSException e) + { + _log.error("Error getting latency data: " + e, e); + } + _actualMessageCount++; + if (_actualMessageCount % 1000 == 0) + { + _log.info("Received message count: " + _actualMessageCount); + } + + if (_actualMessageCount == _expectedMessageCount) + { + _completed = true; + notifyWaiter(); + long timeTaken = System.currentTimeMillis() - _startTime; + _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " + + timeTaken + "ms, equivalent to " + + (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second"); + + try + { + _connection.close(); + _log.info("Connection closed"); + } + catch (JMSException e) + { + _log.error("Error closing connection"); + } + } + } + } + + private void notifyWaiter() + { + if (_waiter != null) + { + synchronized (_waiter) + { + _waiter.notify(); + } + } + } + public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password, + String vpath, String commandQueueName, + final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException + { + _messageCount = messageCount; + MESSAGE_DATA = createMessagePayload(messageDataLength); + try + { + createConnection(brokerHosts, clientID, username, password, vpath); + _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + _connection.setExceptionListener(this); + + + AMQQueue destination = new AMQQueue(commandQueueName); + _producer = (MessageProducer) _session.createProducer(destination); + _producer.setDisableMessageTimestamp(true); + _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + _tempDestination = new AMQQueue("TempResponse" + + Long.toString(System.currentTimeMillis()), true); + MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true, + true, null); + + //Send first message, then wait a bit to allow the provider to get initialised + TextMessage first = _session.createTextMessage(MESSAGE_DATA); + first.setJMSReplyTo(_tempDestination); + _producer.send(first); + try + { + Thread.sleep(1000); + } + catch (InterruptedException ignore) + { + } + + //now start the clock and the test... + final long startTime = System.currentTimeMillis(); + + messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime)); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + /** + * Run the test and notify an object upon receipt of all responses. + * @param waiter the object that will be notified + * @throws JMSException + */ + public void run(Object waiter) throws JMSException + { + _waiter = waiter; + _connection.start(); + for (int i = 1; i < _messageCount; i++) + { + TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i); + msg.setJMSReplyTo(_tempDestination); + if (i % 1000 == 0) + { + long timeNow = System.currentTimeMillis(); + msg.setStringProperty("timeSent", String.valueOf(timeNow)); + } + _producer.send(msg); + } + _log.info("Finished sending " + _messageCount + " messages"); + } + + public boolean isCompleted() + { + return _completed; + } + + private void createConnection(String brokerHosts, String clientID, String username, String password, + String vpath) throws AMQException, URLSyntaxException + { + _connection = new AMQConnection(brokerHosts, username, password, + clientID, vpath); + } + + /** + * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank + * means the server will allocate a name. + */ + public static void main(String[] args) + { + if (args.length < 6) + { + System.err.println( + "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>"); + } + try + { + int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096; + + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); + ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3], + args[4], Integer.parseInt(args[5]), + messageDataLength); + Object waiter = new Object(); + client.run(waiter); + synchronized (waiter) + { + while (!client.isCompleted()) + { + waiter.wait(); + } + } + + } + catch (UnknownHostException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (Exception e) + { + System.err.println("Error in client: " + e); + e.printStackTrace(); + } + } + + /** + * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) + */ + public void onException(JMSException e) + { + System.err.println(e.getMessage()); + e.printStackTrace(System.err); + } +} |