summaryrefslogtreecommitdiff
path: root/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java')
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java296
1 files changed, 161 insertions, 135 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
index 02f011f1b9..6dd8b7e1ca 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
@@ -28,10 +28,12 @@ import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.tools.TestConfiguration.MessageType;
import org.apache.qpid.tools.report.BasicReporter;
import org.apache.qpid.tools.report.Reporter;
@@ -42,140 +44,164 @@ import org.slf4j.LoggerFactory;
public class QpidReceive implements MessageListener
{
- private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
- private final CountDownLatch testCompleted = new CountDownLatch(1);
-
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageConsumer consumer;
- private boolean transacted = false;
- private boolean isRollback = false;
- private int txSize = 0;
- private int rollbackFrequency = 0;
- private int ackFrequency = 0;
- private int expected = 0;
- private int received = 0;
- private Reporter report;
- private TestConfiguration config;
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else if (config.getAckFrequency() > 0)
- {
- session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
- isRollback = config.getRollbackFrequency() > 0;
- rollbackFrequency = config.getRollbackFrequency();
- ackFrequency = config.getAckFrequency();
- }
-
- public void resetCounters()
- {
- received = 0;
- expected = 0;
- report.clear();
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage &&
- TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
- {
- testCompleted.countDown();
- return;
- }
-
- received++;
- report.message(msg);
-
- if (transacted && (received % txSize == 0))
- {
- if (isRollback && (received % rollbackFrequency == 0))
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- else if (ackFrequency > 0)
- {
- msg.acknowledge();
- }
-
- if (expected >= received)
- {
- testCompleted.countDown();
- }
-
- }
- catch(Exception e)
- {
- _logger.error("Error when receiving messages",e);
- }
-
- }
-
- public void waitforCompletion(int expected) throws Exception
- {
- this.expected = expected;
- testCompleted.await();
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader()
- );
- Destination dest = AMQDestination.createDestination(config.getAddress());
- QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
- receiver.setUp();
- receiver.waitforCompletion(config.getMsgCount());
- if (config.isReportTotal())
- {
- reporter.report();
- }
- receiver.tearDown();
- }
+ private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class);
+ private final CountDownLatch testCompleted = new CountDownLatch(1);
+
+ private Connection con;
+ private Session session;
+ private Destination dest;
+ private MessageConsumer consumer;
+ private boolean transacted = false;
+ private boolean isRollback = false;
+ private int txSize = 0;
+ private int rollbackFrequency = 0;
+ private int ackFrequency = 0;
+ private int expected = 0;
+ private int received = 0;
+ private Reporter report;
+ private TestConfiguration config;
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
+ {
+ this(report,config, con, dest, UUID.randomUUID().toString());
+ }
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
+ {
+ //System.out.println("Producer ID : " + id);
+ this.report = report;
+ this.config = config;
+ this.con = con;
+ this.dest = dest;
+ }
+
+ public void setUp() throws Exception
+ {
+ con.start();
+ if (config.isTransacted())
+ {
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else if (config.getAckFrequency() > 0)
+ {
+ session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ }
+ else
+ {
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+ if (_logger.isDebugEnabled())
+ {
+ System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
+ }
+
+ transacted = config.isTransacted();
+ txSize = config.getTransactionSize();
+ isRollback = config.getRollbackFrequency() > 0;
+ rollbackFrequency = config.getRollbackFrequency();
+ ackFrequency = config.getAckFrequency();
+
+ _logger.debug("Ready address : " + config.getReadyAddress());
+ if (config.getReadyAddress() != null)
+ {
+ MessageProducer prod = session.createProducer(AMQDestination
+ .createDestination(config.getReadyAddress()));
+ prod.send(session.createMessage());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending message to ready address " + prod.getDestination());
+ }
+ }
+ }
+
+ public void resetCounters()
+ {
+ received = 0;
+ expected = 0;
+ report.clear();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage &&
+ TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
+ {
+ testCompleted.countDown();
+ return;
+ }
+
+ received++;
+ report.message(msg);
+
+ if (config.isPrintHeaders())
+ {
+ System.out.println(((AbstractJMSMessage)msg).toHeaderString());
+ }
+
+ if (config.isPrintContent())
+ {
+ System.out.println(((AbstractJMSMessage)msg).toBodyString());
+ }
+
+ if (transacted && (received % txSize == 0))
+ {
+ if (isRollback && (received % rollbackFrequency == 0))
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ else if (ackFrequency > 0)
+ {
+ msg.acknowledge();
+ }
+
+ if (received >= expected)
+ {
+ testCompleted.countDown();
+ }
+
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error when receiving messages",e);
+ }
+ }
+
+ public void waitforCompletion(int expected) throws Exception
+ {
+ this.expected = expected;
+ testCompleted.await();
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ Reporter reporter = new BasicReporter(ThroughputAndLatency.class,
+ System.out,
+ config.reportEvery(),
+ config.isReportHeader());
+ Destination dest = AMQDestination.createDestination(config.getAddress());
+ QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
+ receiver.setUp();
+ receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS());
+ if (config.isReportTotal())
+ {
+ reporter.report();
+ }
+ receiver.tearDown();
+ }
}