summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-10 22:44:42 +0000
committerRobert Greig <rgreig@apache.org>2007-01-10 22:44:42 +0000
commit38d016d97fbb0af66ea913f6a96f696b27e63e4b (patch)
treee4b5e0d6ecacabd9b657a3ef5c7adbb0fc26111a
parentcead14d18c252574d9c297acdc8d07961ad073f1 (diff)
downloadqpid-python-38d016d97fbb0af66ea913f6a96f696b27e63e4b.tar.gz
QPID-32 : Add option to run tests with persistent messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495020 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xjava/perftests/bin/serviceProvidingClient.sh2
-rwxr-xr-xjava/perftests/bin/serviceRequestingClient.sh2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java39
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java42
4 files changed, 65 insertions, 20 deletions
diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh
index 3434e6a667..4c51682b58 100755
--- a/java/perftests/bin/serviceProvidingClient.sh
+++ b/java/perftests/bin/serviceProvidingClient.sh
@@ -28,4 +28,4 @@ fi
. ./setupclasspath.sh
echo $CP
# usage: just pass in the host(s)
-$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ P T
diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh
index 55ae8d5f52..007b927860 100755
--- a/java/perftests/bin/serviceRequestingClient.sh
+++ b/java/perftests/bin/serviceRequestingClient.sh
@@ -30,4 +30,4 @@ echo $thehosts
# XXX -Xms1024m -XX:NewSize=300m
. ./setupclasspath.sh
echo $CP
-$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@"
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ P T "$@"
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
index ddee643a76..a261d8e5da 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
@@ -42,10 +42,21 @@ public class ServiceProvidingClient
private AMQConnection _connection;
+ private Session _session;
+ private Session _producerSession;
+
+ private boolean _isTransactional;
+
public ServiceProvidingClient(String brokerDetails, String username, String password,
- String clientName, String virtualPath, String serviceName)
+ String clientName, String virtualPath, String serviceName,
+ String deliveryModeString, String transactedMode)
throws AMQException, JMSException, URLSyntaxException
{
+ final int deliveryMode = deliveryModeString.toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+ _isTransactional = transactedMode.toUpperCase().charAt(0) == 'T' ? true : false;
+
+ _logger.info("Delivery Mode: " + deliveryMode + "\t isTransactional: " + _isTransactional);
+
_connection = new AMQConnection(brokerDetails, username, password,
clientName, virtualPath);
_connection.setConnectionListener(new ConnectionListener()
@@ -74,13 +85,14 @@ public class ServiceProvidingClient
_logger.info("App got failover complete callback");
}
});
- final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
_logger.info("Service (queue) name is '" + serviceName + "'...");
AMQQueue destination = new AMQQueue(serviceName);
- MessageConsumer consumer = session.createConsumer(destination,
+ MessageConsumer consumer = _session.createConsumer(destination,
100, true, false, null);
consumer.setMessageListener(new MessageListener()
@@ -107,9 +119,9 @@ public class ServiceProvidingClient
_responseDest = responseDest;
_logger.info("About to create a producer");
- _destinationProducer = session.createProducer(responseDest);
+ _destinationProducer = _producerSession.createProducer(responseDest);
_destinationProducer.setDisableMessageTimestamp(true);
- _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _destinationProducer.setDeliveryMode(deliveryMode);
_logger.info("After create a producer");
}
}
@@ -127,7 +139,7 @@ public class ServiceProvidingClient
try
{
String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
- TextMessage msg = session.createTextMessage(payload);
+ TextMessage msg = _producerSession.createTextMessage(payload);
if (tm.propertyExists("timeSent"))
{
_logger.info("timeSent property set on message");
@@ -135,6 +147,15 @@ public class ServiceProvidingClient
msg.setStringProperty("timeSent", tm.getStringProperty("timeSent"));
}
_destinationProducer.send(msg);
+
+ if(_isTransactional)
+ {
+ _producerSession.commit();
+ }
+ if(_isTransactional)
+ {
+ _session.commit();
+ }
if (_messageCount % 1000 == 0)
{
_logger.info("Sent response to '" + _responseDest + "'");
@@ -158,9 +179,9 @@ public class ServiceProvidingClient
{
_logger.info("Starting...");
- if (args.length < 5)
+ if (args.length < 7)
{
- System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]");
+ System.out.println("Usage: brokerDetails username password virtual-path serviceQueue <P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]> [selector]");
System.exit(1);
}
String clientId = null;
@@ -177,7 +198,7 @@ public class ServiceProvidingClient
try
{
ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
- clientId, args[3], args[4]);
+ clientId, args[3], args[4], args[5], args[6]);
client.run();
}
catch (JMSException e)
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
index 93e2d4685b..b58b8eb0ef 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
@@ -53,10 +53,12 @@ public class ServiceRequestingClient implements ExceptionListener
private AMQConnection _connection;
private Session _session;
+ private Session _producerSession;
private long _averageLatency;
private int _messageCount;
+ private boolean _isTransactional;
private volatile boolean _completed;
@@ -106,7 +108,7 @@ public class ServiceRequestingClient implements ExceptionListener
}
try
{
- m.getPropertyNames();
+ m.getPropertyNames();
if (m.propertyExists("timeSent"))
{
long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
@@ -123,6 +125,10 @@ public class ServiceRequestingClient implements ExceptionListener
_log.info("Average latency now: " + _averageLatency);
}
}
+ if(_isTransactional)
+ {
+ _session.commit();
+ }
}
catch (JMSException e)
{
@@ -168,24 +174,33 @@ public class ServiceRequestingClient implements ExceptionListener
}
public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
- String vpath, String commandQueueName,
+ String vpath, String commandQueueName,
+ String deliveryModeString, String transactedMode,
final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
{
+ final int deliveryMode = deliveryModeString.toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
+ : DeliveryMode.NON_PERSISTENT;
+
+ _isTransactional = transactedMode.toUpperCase().charAt(0) == 'T' ? true : false;
+
+ _log.info("Delivery Mode: " + deliveryMode + "\t isTransactional: " + _isTransactional);
+
_messageCount = messageCount;
MESSAGE_DATA = createMessagePayload(messageDataLength);
try
{
createConnection(brokerHosts, clientID, username, password, vpath);
- _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
_connection.setExceptionListener(this);
AMQQueue destination = new AMQQueue(commandQueueName);
- _producer = (MessageProducer) _session.createProducer(destination);
+ _producer = (MessageProducer) _producerSession.createProducer(destination);
_producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _producer.setDeliveryMode(deliveryMode);
_tempDestination = new AMQQueue("TempResponse" +
Long.toString(System.currentTimeMillis()), true);
@@ -196,6 +211,10 @@ public class ServiceRequestingClient implements ExceptionListener
TextMessage first = _session.createTextMessage(MESSAGE_DATA);
first.setJMSReplyTo(_tempDestination);
_producer.send(first);
+ if(_isTransactional)
+ {
+ _producerSession.commit();
+ }
try
{
Thread.sleep(1000);
@@ -227,7 +246,7 @@ public class ServiceRequestingClient implements ExceptionListener
_connection.start();
for (int i = 1; i < _messageCount; i++)
{
- TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+ TextMessage msg = _producerSession.createTextMessage(MESSAGE_DATA + i);
msg.setJMSReplyTo(_tempDestination);
if (i % 1000 == 0)
{
@@ -235,6 +254,11 @@ public class ServiceRequestingClient implements ExceptionListener
msg.setStringProperty("timeSent", String.valueOf(timeNow));
}
_producer.send(msg);
+ if(_isTransactional)
+ {
+ _producerSession.commit();
+ }
+
}
_log.info("Finished sending " + _messageCount + " messages");
}
@@ -260,17 +284,17 @@ public class ServiceRequestingClient implements ExceptionListener
if (args.length < 6)
{
System.err.println(
- "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
+ "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]> <number of messages> <message size>");
System.exit(1);
}
try
{
- int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;
+ int messageDataLength = args.length > 8 ? Integer.parseInt(args[8]) : 4096;
InetAddress address = InetAddress.getLocalHost();
String clientID = address.getHostName() + System.currentTimeMillis();
ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
- args[4], Integer.parseInt(args[5]),
+ args[4], args[5], args[6], Integer.parseInt(args[7]),
messageDataLength);
Object waiter = new Object();
client.run(waiter);