diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-10 22:44:42 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-10 22:44:42 +0000 |
commit | 38d016d97fbb0af66ea913f6a96f696b27e63e4b (patch) | |
tree | e4b5e0d6ecacabd9b657a3ef5c7adbb0fc26111a | |
parent | cead14d18c252574d9c297acdc8d07961ad073f1 (diff) | |
download | qpid-python-38d016d97fbb0af66ea913f6a96f696b27e63e4b.tar.gz |
QPID-32 : Add option to run tests with persistent messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495020 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 65 insertions, 20 deletions
diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh index 3434e6a667..4c51682b58 100755 --- a/java/perftests/bin/serviceProvidingClient.sh +++ b/java/perftests/bin/serviceProvidingClient.sh @@ -28,4 +28,4 @@ fi . ./setupclasspath.sh echo $CP # usage: just pass in the host(s) -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ P T diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh index 55ae8d5f52..007b927860 100755 --- a/java/perftests/bin/serviceRequestingClient.sh +++ b/java/perftests/bin/serviceRequestingClient.sh @@ -30,4 +30,4 @@ echo $thehosts # XXX -Xms1024m -XX:NewSize=300m . ./setupclasspath.sh echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@" +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ P T "$@" diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java index ddee643a76..a261d8e5da 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java @@ -42,10 +42,21 @@ public class ServiceProvidingClient private AMQConnection _connection; + private Session _session; + private Session _producerSession; + + private boolean _isTransactional; + public ServiceProvidingClient(String brokerDetails, String username, String password, - String clientName, String virtualPath, String serviceName) + String clientName, String virtualPath, String serviceName, + String deliveryModeString, String transactedMode) throws AMQException, JMSException, URLSyntaxException { + final int deliveryMode = deliveryModeString.toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; + _isTransactional = transactedMode.toUpperCase().charAt(0) == 'T' ? true : false; + + _logger.info("Delivery Mode: " + deliveryMode + "\t isTransactional: " + _isTransactional); + _connection = new AMQConnection(brokerDetails, username, password, clientName, virtualPath); _connection.setConnectionListener(new ConnectionListener() @@ -74,13 +85,14 @@ public class ServiceProvidingClient _logger.info("App got failover complete callback"); } }); - final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); + _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); _logger.info("Service (queue) name is '" + serviceName + "'..."); AMQQueue destination = new AMQQueue(serviceName); - MessageConsumer consumer = session.createConsumer(destination, + MessageConsumer consumer = _session.createConsumer(destination, 100, true, false, null); consumer.setMessageListener(new MessageListener() @@ -107,9 +119,9 @@ public class ServiceProvidingClient _responseDest = responseDest; _logger.info("About to create a producer"); - _destinationProducer = session.createProducer(responseDest); + _destinationProducer = _producerSession.createProducer(responseDest); _destinationProducer.setDisableMessageTimestamp(true); - _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + _destinationProducer.setDeliveryMode(deliveryMode); _logger.info("After create a producer"); } } @@ -127,7 +139,7 @@ public class ServiceProvidingClient try { String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText(); - TextMessage msg = session.createTextMessage(payload); + TextMessage msg = _producerSession.createTextMessage(payload); if (tm.propertyExists("timeSent")) { _logger.info("timeSent property set on message"); @@ -135,6 +147,15 @@ public class ServiceProvidingClient msg.setStringProperty("timeSent", tm.getStringProperty("timeSent")); } _destinationProducer.send(msg); + + if(_isTransactional) + { + _producerSession.commit(); + } + if(_isTransactional) + { + _session.commit(); + } if (_messageCount % 1000 == 0) { _logger.info("Sent response to '" + _responseDest + "'"); @@ -158,9 +179,9 @@ public class ServiceProvidingClient { _logger.info("Starting..."); - if (args.length < 5) + if (args.length < 7) { - System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]"); + System.out.println("Usage: brokerDetails username password virtual-path serviceQueue <P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]> [selector]"); System.exit(1); } String clientId = null; @@ -177,7 +198,7 @@ public class ServiceProvidingClient try { ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2], - clientId, args[3], args[4]); + clientId, args[3], args[4], args[5], args[6]); client.run(); } catch (JMSException e) diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java index 93e2d4685b..b58b8eb0ef 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java @@ -53,10 +53,12 @@ public class ServiceRequestingClient implements ExceptionListener private AMQConnection _connection; private Session _session; + private Session _producerSession; private long _averageLatency; private int _messageCount; + private boolean _isTransactional; private volatile boolean _completed; @@ -106,7 +108,7 @@ public class ServiceRequestingClient implements ExceptionListener } try { - m.getPropertyNames(); + m.getPropertyNames(); if (m.propertyExists("timeSent")) { long timeSent = Long.parseLong(m.getStringProperty("timeSent")); @@ -123,6 +125,10 @@ public class ServiceRequestingClient implements ExceptionListener _log.info("Average latency now: " + _averageLatency); } } + if(_isTransactional) + { + _session.commit(); + } } catch (JMSException e) { @@ -168,24 +174,33 @@ public class ServiceRequestingClient implements ExceptionListener } public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password, - String vpath, String commandQueueName, + String vpath, String commandQueueName, + String deliveryModeString, String transactedMode, final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException { + final int deliveryMode = deliveryModeString.toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT + : DeliveryMode.NON_PERSISTENT; + + _isTransactional = transactedMode.toUpperCase().charAt(0) == 'T' ? true : false; + + _log.info("Delivery Mode: " + deliveryMode + "\t isTransactional: " + _isTransactional); + _messageCount = messageCount; MESSAGE_DATA = createMessagePayload(messageDataLength); try { createConnection(brokerHosts, clientID, username, password, vpath); - _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); + _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE); _connection.setExceptionListener(this); AMQQueue destination = new AMQQueue(commandQueueName); - _producer = (MessageProducer) _session.createProducer(destination); + _producer = (MessageProducer) _producerSession.createProducer(destination); _producer.setDisableMessageTimestamp(true); - _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + _producer.setDeliveryMode(deliveryMode); _tempDestination = new AMQQueue("TempResponse" + Long.toString(System.currentTimeMillis()), true); @@ -196,6 +211,10 @@ public class ServiceRequestingClient implements ExceptionListener TextMessage first = _session.createTextMessage(MESSAGE_DATA); first.setJMSReplyTo(_tempDestination); _producer.send(first); + if(_isTransactional) + { + _producerSession.commit(); + } try { Thread.sleep(1000); @@ -227,7 +246,7 @@ public class ServiceRequestingClient implements ExceptionListener _connection.start(); for (int i = 1; i < _messageCount; i++) { - TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i); + TextMessage msg = _producerSession.createTextMessage(MESSAGE_DATA + i); msg.setJMSReplyTo(_tempDestination); if (i % 1000 == 0) { @@ -235,6 +254,11 @@ public class ServiceRequestingClient implements ExceptionListener msg.setStringProperty("timeSent", String.valueOf(timeNow)); } _producer.send(msg); + if(_isTransactional) + { + _producerSession.commit(); + } + } _log.info("Finished sending " + _messageCount + " messages"); } @@ -260,17 +284,17 @@ public class ServiceRequestingClient implements ExceptionListener if (args.length < 6) { System.err.println( - "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>"); + "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]> <number of messages> <message size>"); System.exit(1); } try { - int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096; + int messageDataLength = args.length > 8 ? Integer.parseInt(args[8]) : 4096; InetAddress address = InetAddress.getLocalHost(); String clientID = address.getHostName() + System.currentTimeMillis(); ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3], - args[4], Integer.parseInt(args[5]), + args[4], args[5], args[6], Integer.parseInt(args[7]), messageDataLength); Object waiter = new Object(); client.run(waiter); |