diff options
Diffstat (limited to 'java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java')
-rw-r--r-- | java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java | 296 |
1 files changed, 161 insertions, 135 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java index 02f011f1b9..6dd8b7e1ca 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java @@ -28,10 +28,12 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.tools.TestConfiguration.MessageType; import org.apache.qpid.tools.report.BasicReporter; import org.apache.qpid.tools.report.Reporter; @@ -42,140 +44,164 @@ import org.slf4j.LoggerFactory; public class QpidReceive implements MessageListener { - private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); - private final CountDownLatch testCompleted = new CountDownLatch(1); - - private Connection con; - private Session session; - private Destination dest; - private MessageConsumer consumer; - private boolean transacted = false; - private boolean isRollback = false; - private int txSize = 0; - private int rollbackFrequency = 0; - private int ackFrequency = 0; - private int expected = 0; - private int received = 0; - private Reporter report; - private TestConfiguration config; - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) - { - this(report,config, con, dest, UUID.randomUUID().toString()); - } - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) - { - //System.out.println("Producer ID : " + id); - this.report = report; - this.config = config; - this.con = con; - this.dest = dest; - } - - public void setUp() throws Exception - { - if (config.isTransacted()) - { - session = con.createSession(true, Session.SESSION_TRANSACTED); - } - else if (config.getAckFrequency() > 0) - { - session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - } - else - { - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); - - transacted = config.isTransacted(); - txSize = config.getTransactionSize(); - isRollback = config.getRollbackFrequency() > 0; - rollbackFrequency = config.getRollbackFrequency(); - ackFrequency = config.getAckFrequency(); - } - - public void resetCounters() - { - received = 0; - expected = 0; - report.clear(); - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && - TestConfiguration.EOS.equals(((TextMessage)msg).getText())) - { - testCompleted.countDown(); - return; - } - - received++; - report.message(msg); - - if (transacted && (received % txSize == 0)) - { - if (isRollback && (received % rollbackFrequency == 0)) - { - session.rollback(); - } - else - { - session.commit(); - } - } - else if (ackFrequency > 0) - { - msg.acknowledge(); - } - - if (expected >= received) - { - testCompleted.countDown(); - } - - } - catch(Exception e) - { - _logger.error("Error when receiving messages",e); - } - - } - - public void waitforCompletion(int expected) throws Exception - { - this.expected = expected; - testCompleted.await(); - } - - public void tearDown() throws Exception - { - session.close(); - } - - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class, - System.out, - config.reportEvery(), - config.isReportHeader() - ); - Destination dest = AMQDestination.createDestination(config.getAddress()); - QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); - receiver.setUp(); - receiver.waitforCompletion(config.getMsgCount()); - if (config.isReportTotal()) - { - reporter.report(); - } - receiver.tearDown(); - } + private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class); + private final CountDownLatch testCompleted = new CountDownLatch(1); + + private Connection con; + private Session session; + private Destination dest; + private MessageConsumer consumer; + private boolean transacted = false; + private boolean isRollback = false; + private int txSize = 0; + private int rollbackFrequency = 0; + private int ackFrequency = 0; + private int expected = 0; + private int received = 0; + private Reporter report; + private TestConfiguration config; + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + con.start(); + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else if (config.getAckFrequency() > 0) + { + session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + if (_logger.isDebugEnabled()) + { + System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); + } + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + isRollback = config.getRollbackFrequency() > 0; + rollbackFrequency = config.getRollbackFrequency(); + ackFrequency = config.getAckFrequency(); + + _logger.debug("Ready address : " + config.getReadyAddress()); + if (config.getReadyAddress() != null) + { + MessageProducer prod = session.createProducer(AMQDestination + .createDestination(config.getReadyAddress())); + prod.send(session.createMessage()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending message to ready address " + prod.getDestination()); + } + } + } + + public void resetCounters() + { + received = 0; + expected = 0; + report.clear(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && + TestConfiguration.EOS.equals(((TextMessage)msg).getText())) + { + testCompleted.countDown(); + return; + } + + received++; + report.message(msg); + + if (config.isPrintHeaders()) + { + System.out.println(((AbstractJMSMessage)msg).toHeaderString()); + } + + if (config.isPrintContent()) + { + System.out.println(((AbstractJMSMessage)msg).toBodyString()); + } + + if (transacted && (received % txSize == 0)) + { + if (isRollback && (received % rollbackFrequency == 0)) + { + session.rollback(); + } + else + { + session.commit(); + } + } + else if (ackFrequency > 0) + { + msg.acknowledge(); + } + + if (received >= expected) + { + testCompleted.countDown(); + } + + } + catch(Exception e) + { + _logger.error("Error when receiving messages",e); + } + } + + public void waitforCompletion(int expected) throws Exception + { + this.expected = expected; + testCompleted.await(); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(ThroughputAndLatency.class, + System.out, + config.reportEvery(), + config.isReportHeader()); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); + receiver.setUp(); + receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS()); + if (config.isReportTotal()) + { + reporter.report(); + } + receiver.tearDown(); + } } |