summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-24 15:08:13 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-24 15:08:13 +0000
commite99bbab21e8364967d325e451d4ebea8be33f955 (patch)
treee9591dbe0f2b333dbae2be7a449f22da6da7b09f
parentb858240d8cf3c9e0c23910ef85e9acdb7af95684 (diff)
downloadqpid-python-e99bbab21e8364967d325e451d4ebea8be33f955.tar.gz
Added commit calls for the received messages.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499429 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java249
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java33
2 files changed, 156 insertions, 126 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index fa0887b3c9..da63a6ae96 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -157,7 +157,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
- throws Exception
+ throws Exception
{
// Create a connection to the broker.
InetAddress address = InetAddress.getLocalHost();
@@ -217,10 +217,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
* @throws Exception All allowed to fall through. This is only test code...
*/
public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
- boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
- boolean beforeSend, boolean failOnce, int batchSize, int noOfDestinations, int rate,
- boolean pubsub) throws Exception
+ String destinationName, String selector, boolean transacted, boolean persistent,
+ int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+ boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
+ int noOfDestinations, int rate, boolean pubsub) throws Exception
{
this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
@@ -245,6 +245,98 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
}
+ private void createPingDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _pingDestination = new AMQTopic(name);
+ }
+ else
+ {
+ _pingDestination = new AMQQueue(name);
+ }
+ }
+
+ /**
+ * Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
+ * is created with null destination, so that any destination can be specified while sending
+ *
+ * @throws JMSException
+ */
+ public void createProducer() throws JMSException
+ {
+ if (getDestinationsCount() > 1)
+ {
+ // create producer with initial destination as null for test with multiple-destinations
+ // In this case, a different destination will be used while sending the message
+ _producer = (MessageProducer) getProducerSession().createProducer(null);
+ }
+ else
+ {
+ // Create a producer with known destination to send the pings on.
+ _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
+
+ }
+
+ _producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+
+ /**
+ * Creates the temporary destination to listen to the responses
+ *
+ * @param selector
+ * @throws JMSException
+ */
+ public void createConsumer(String selector) throws JMSException
+ {
+ // Create a temporary destination to get the pongs on.
+ if (isPubSub())
+ {
+ _replyDestination = _consumerSession.createTemporaryTopic();
+ }
+ else
+ {
+ _replyDestination = _consumerSession.createTemporaryQueue();
+ }
+
+ // Create a message consumer to get the replies with and register this to be called back by it.
+ MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+ consumer.setMessageListener(this);
+ }
+
+ /**
+ * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
+ *
+ * @param selector
+ * @throws JMSException
+ */
+ public void createConsumers(String selector) throws JMSException
+ {
+ for (int i = 0; i < getDestinationsCount(); i++)
+ {
+ MessageConsumer consumer =
+ getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
+ consumer.setMessageListener(this);
+ }
+ }
+
+
+ public Session getConsumerSession()
+ {
+ return _consumerSession;
+ }
+
+ public Destination getPingDestination()
+ {
+ return _pingDestination;
+ }
+
+ protected void setPingDestination(Destination destination)
+ {
+ _pingDestination = destination;
+ }
+
/**
* Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
* to be started to bounce the pings back again.
@@ -264,9 +356,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Extract the command line.
if (args.length < 2)
{
- System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] "
- + "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]"
- + " [rate] [pubsub(true/false)]");
+ System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
+ "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]" +
+ " [rate] [pubsub(true/false)]");
System.exit(0);
}
@@ -319,10 +411,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
// Create a ping producer to handle the request/wait/reply cycle.
- PingPongProducer pingProducer =
- new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_DESTINATION_NAME, null, transacted,
- persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend,
- failOnce, batchSize, 0, rate, ispubsub);
+ PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
+ PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ 0, rate, ispubsub);
pingProducer.getConnection().start();
@@ -341,76 +433,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
/**
- * Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
- * is created with null destination, so that any destination can be specified while sending
- *
- * @throws JMSException
- */
- public void createProducer() throws JMSException
- {
- if (getDestinationsCount() > 1)
- {
- // create producer with initial destination as null for test with multiple-destinations
- // In this case, a different destination will be used while sending the message
- _producer = (MessageProducer) getProducerSession().createProducer(null);
- }
- else
- {
- // Create a producer with known destination to send the pings on.
- _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
-
- }
-
- _producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- }
-
- /**
- * Creates the temporary destination to listen to the responses
- *
- * @param selector
- * @throws JMSException
- */
- public void createConsumer(String selector) throws JMSException
- {
- // Create a temporary destination to get the pongs on.
- if (isPubSub())
- {
- _replyDestination = _consumerSession.createTemporaryTopic();
- }
- else
- {
- _replyDestination = _consumerSession.createTemporaryQueue();
- }
-
- // Create a message consumer to get the replies with and register this to be called back by it.
- MessageConsumer consumer =
- _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
- consumer.setMessageListener(this);
- }
-
- /**
- * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
- *
- * @param selector
- * @throws JMSException
- */
- public void createConsumers(String selector) throws JMSException
- {
- for (int i = 0; i < getDestinationsCount(); i++)
- {
- MessageConsumer consumer =
- getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
- consumer.setMessageListener(this);
- }
- }
-
- public Destination getPingDestination()
- {
- return _pingDestination;
- }
-
- /**
* Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
* on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
* this a few times, in order to prime the JVMs JIT compilation.
@@ -433,9 +455,12 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
Thread.sleep(100);
}
catch (InterruptedException ignore)
- { }
+ {
+
+ }
}
+
}
/**
@@ -463,9 +488,16 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
if (trafficLight != null)
{
- _logger.debug("Reply was expected, decrementing the latch for the id.");
+ _logger.trace("Reply was expected, decrementing the latch for the id.");
trafficLight.countDown();
+ long remainingCount = trafficLight.getCount();
+
+ if ((remainingCount % _txBatchSize) == 0)
+ {
+ commitTx(getConsumerSession());
+ }
+
if (_messageListener != null)
{
_messageListener.onMessage(message);
@@ -474,7 +506,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
else
{
- _logger.debug("There was no thread waiting for reply: " + correlationID);
+ _logger.trace("There was no thread waiting for reply: " + correlationID);
}
if (_verbose)
@@ -484,7 +516,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
if (timestamp != null)
{
long diff = System.currentTimeMillis() - timestamp;
- _logger.info("Time for round trip: " + diff);
+ _logger.trace("Time for round trip: " + diff);
}
}
}
@@ -513,11 +545,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
pingNoWaitForReply(message, numPings, messageCorrelationId);
CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
-
// Block the current thread until a reply to the message is received, or it times out.
trafficLight.await(timeout, TimeUnit.MILLISECONDS);
- // Remove the countdown latch from the map because the map is long lived, so will cause a memory leak.
trafficLights.remove(messageCorrelationId);
// Work out how many replies were receieved.
@@ -532,6 +562,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
_logger.info("Got all replies on id, " + messageCorrelationId);
}
+ commitTx(getConsumerSession());
+
return numReplies;
}
@@ -548,8 +580,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
* @return The reply, or null if no reply arrives before the timeout.
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
- public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId)
- throws JMSException, InterruptedException
+ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
{
// Create a count down latch to count the number of replies with. This is created before the message is sent
// so that the message is not received before the count down is created.
@@ -647,6 +678,11 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
return _replyDestination;
}
+ protected void setReplyDestination(Destination destination)
+ {
+ _replyDestination = destination;
+ }
+
public void setMessageListener(MessageListener messageListener)
{
_messageListener = messageListener;
@@ -657,33 +693,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
return trafficLights.get(correlationID);
}
- protected Session getConsumerSession()
- {
- return _consumerSession;
- }
-
- protected void setPingDestination(Destination destination)
- {
- _pingDestination = destination;
- }
-
- protected void setReplyDestination(Destination destination)
- {
- _replyDestination = destination;
- }
-
- private void createPingDestination(String name)
- {
- if (isPubSub())
- {
- _pingDestination = new AMQTopic(name);
- }
- else
- {
- _pingDestination = new AMQQueue(name);
- }
- }
-
/*
* When the test is being performed with multiple queues, then this method will be used, which has a loop to
* pick up the next queue from the queues list and sends message to it.
@@ -717,10 +726,12 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
public static class FailoverNotifier implements ConnectionListener
{
public void bytesSent(long count)
- { }
+ {
+ }
public void bytesReceived(long count)
- { }
+ {
+ }
public boolean preFailover(boolean redirect)
{
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index edd3c909c6..c76f361f99 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -33,7 +33,7 @@ import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.log4j.Logger;
-import java.util.concurrent.CountDownLatch;
+
public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware
{
@@ -157,7 +157,11 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
//
// try
// {
+// _logger.debug("Sending messages");
+//
// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID);
+//
+// _logger.debug("All sent");
// }
// catch (JMSException e)
// {
@@ -173,9 +177,11 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// {
// try
// {
-// _logger.info("awating test finish");
+// _logger.debug("Awating test finish");
//
// perThreadSetup._pingItselfClient.getEndLock(correlationID).await();
+//
+//
// }
// catch (InterruptedException e)
// {
@@ -189,9 +195,16 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// _logger.info("Test Finished");
//
// if (numReplies != numPings)
-//
// {
-// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
+//
+// try
+// {
+// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
+// }
+// catch (JMSException e)
+// {
+// _logger.error("Error commiting recevied messages", e);
+// }
// try
// {
// _timingController.completeTest(false, numPings - numReplies);
@@ -200,6 +213,7 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// {
// //ignore
// }
+// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
// }
// }
//
@@ -241,13 +255,13 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
//
// public void onMessage(Message message)
// {
-// _logger.info("Message Recevied");
+// _logger.trace("Message Recevied");
//
// _messageReceived++;
//
// try
// {
-// if (_messageReceived == _batchSize)
+// if (_messageReceived % _batchSize == 0)
// {
// if (_timingController != null)
// {
@@ -257,7 +271,8 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// }
// catch (InterruptedException e)
// {
-// doDone();
+//// _logger.error("Interupted");
+//// doDone();
// }
//
// if (_totalMessages == -1 || _messageReceived == _totalMessages)
@@ -270,6 +285,10 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// private void doDone()
// {
// _done = true;
+//
+// _logger.error("Messages received:" + _messageReceived);
+// _logger.error("Total Messages :" + _totalMessages);
+//
// try
// {
// _timingController.completeTest(true, _totalMessages - _messageReceived);