diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /java/tools/src | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/src')
8 files changed, 980 insertions, 880 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java index dbc73c404f..de7748acd6 100644 --- a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java +++ b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java @@ -1,4 +1,3 @@ -package org.apache.qpid.testkit; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.testkit; * under the License. * */ +package org.apache.qpid.testkit; public interface ErrorHandler { diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java index 4e79dd62a8..7eb83a520b 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.tools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * In the future this will be replaced by a Clock abstraction * that can utilize a realtime clock when running in RT Java. @@ -27,6 +30,8 @@ package org.apache.qpid.tools; public class Clock { + private static final Logger _logger = LoggerFactory.getLogger(Clock.class); + public final static long SEC = 60000; private static Precision precision; @@ -54,7 +59,11 @@ public class Clock precision = Precision.getPrecision(System.getProperty("precision","mili")); //offset = Long.getLong("offset",-1); - System.out.println("Using precision : " + precision + " and offset " + offset); + if (_logger.isDebugEnabled()) + { + System.out.println("Using precision : " + precision ); + //+ " and offset " + offset); + } } public static Precision getPrecision() diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java index c6abdf6c84..e0e48519f3 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java @@ -29,383 +29,422 @@ import org.apache.qpid.client.AMQConnection; public class JVMArgConfiguration implements TestConfiguration { - /* - * By default the connection URL is used. - * This allows a user to easily specify a fully fledged URL any given property. - * Ex. SSL parameters - * - * By providing a host & port allows a user to simply override the URL. - * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. - */ - private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - private String host = ""; + private String host = ""; - private int port = -1; + private int port = -1; - private String address = "queue; {create : always}"; + private String address = "queue; {create : always}"; - private int msg_size = 1024; + private long timeout = 0; - private int random_msg_size_start_from = 1; + private int msg_size = 1024; - private boolean cacheMessage = false; + private int random_msg_size_start_from = 1; - private boolean disableMessageID = false; + private boolean cacheMessage = false; - private boolean disableTimestamp = false; + private boolean disableMessageID = false; - private boolean durable = false; + private boolean disableTimestamp = false; - private int transaction_size = 0; + private boolean durable = false; - private int ack_mode = Session.AUTO_ACKNOWLEDGE; + private int transaction_size = 0; - private int msg_count = 10; + private int ack_mode = Session.AUTO_ACKNOWLEDGE; - private int warmup_count = 1; + private int msg_count = 10; - private boolean random_msg_size = false; + private int warmup_count = 1; - private String msgType = "bytes"; + private boolean random_msg_size = false; - private boolean printStdDev = false; + private String msgType = "bytes"; - private int sendRate = 0; + private boolean printStdDev = false; - private boolean externalController = false; + private int sendRate = 0; - private boolean useUniqueDest = false; // useful when using multiple connections. + private boolean externalController = false; - private int ackFrequency = 100; + private boolean useUniqueDest = false; // useful when using multiple connections. - private DecimalFormat df = new DecimalFormat("###.##"); + private int ackFrequency = 100; - private int reportEvery = 0; + private DecimalFormat df = new DecimalFormat("###.##"); - private boolean isReportTotal = false; - - private boolean isReportHeader = true; + private int reportEvery = 0; - private boolean isReportLatency = false; - - private int sendEOS = 0; - - private int connectionCount = 1; - - private int rollbackFrequency = 0; - - private boolean printHeaders; - - public JVMArgConfiguration() - { - - url = System.getProperty("url",url); - host = System.getProperty("host",""); - port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); - - msg_size = Integer.getInteger("msg-size", 1024); - cacheMessage = Boolean.getBoolean("cache-msg"); - disableMessageID = Boolean.getBoolean("disable-message-id"); - disableTimestamp = Boolean.getBoolean("disable-timestamp"); - durable = Boolean.getBoolean("durable"); - transaction_size = Integer.getInteger("tx",1000); - ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg-count",msg_count); - warmup_count = Integer.getInteger("warmup-count",warmup_count); - random_msg_size = Boolean.getBoolean("random-msg-size"); - msgType = System.getProperty("msg-type","bytes"); - printStdDev = Boolean.getBoolean("print-std-dev"); - sendRate = Integer.getInteger("rate",0); - externalController = Boolean.getBoolean("ext-controller"); - useUniqueDest = Boolean.getBoolean("use-unique-dest"); - random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1); - reportEvery = Integer.getInteger("report-every"); - isReportTotal = Boolean.getBoolean("report-total"); - isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header"); - isReportLatency = Boolean.getBoolean("report-latency"); - sendEOS = Integer.getInteger("send-eos"); - connectionCount = Integer.getInteger("con_count",1); - ackFrequency = Integer.getInteger("ack-frequency"); - rollbackFrequency = Integer.getInteger("rollback-frequency"); - printHeaders = Boolean.getBoolean("print-headers"); - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getUrl() - */ - @Override - public String getUrl() - { - return url; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getHost() - */ - @Override - public String getHost() - { - return host; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getPort() - */ - @Override - public int getPort() - { - return port; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAddress() - */ - @Override - public String getAddress() - { - return address; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAckMode() - */ - @Override - public int getAckMode() - { - return ack_mode; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMsgCount() - */ - @Override - public int getMsgCount() - { - return msg_count; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMsgSize() - */ - @Override - public int getMsgSize() - { - return msg_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom() - */ - @Override - public int getRandomMsgSizeStartFrom() - { - return random_msg_size_start_from; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDurable() - */ - @Override - public boolean isDurable() - { - return durable; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isTransacted() - */ - @Override - public boolean isTransacted() - { - return transaction_size > 0; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize() - */ - @Override - public int getTransactionSize() - { - return transaction_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount() - */ - @Override - public int getWarmupCount() - { - return warmup_count; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage() - */ - @Override - public boolean isCacheMessage() - { - return cacheMessage; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID() - */ - @Override - public boolean isDisableMessageID() - { - return disableMessageID; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp() - */ - @Override - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize() - */ - @Override - public boolean isRandomMsgSize() - { - return random_msg_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMessageType() - */ - @Override - public String getMessageType() - { - return msgType; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev() - */ - @Override - public boolean isPrintStdDev() - { - return printStdDev; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getSendRate() - */ - @Override - public int getSendRate() - { - return sendRate; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isExternalController() - */ - @Override - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests() - */ - @Override - public boolean isUseUniqueDests() - { - return useUniqueDest; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency() - */ - @Override - public int getAckFrequency() - { - return ackFrequency; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#createConnection() - */ - @Override - public Connection createConnection() throws Exception - { - if (getHost().equals("") || getPort() == -1) - { - return new AMQConnection(getUrl()); - } - else - { - return new AMQConnection(getHost(),getPort(),"guest","guest","test","test"); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat() - */ - @Override - public DecimalFormat getDecimalFormat() - { - return df; - } - - @Override - public int reportEvery() - { - return reportEvery; - } - - @Override - public boolean isReportTotal() - { - return isReportTotal; - } - - @Override - public boolean isReportHeader() - { - return isReportHeader; - } - - @Override - public boolean isReportLatency() - { - return isReportLatency; - } - - @Override - public int getSendEOS() - { - return sendEOS; - } - - @Override - public int getConnectionCount() - { - return connectionCount; - } - - @Override - public int getRollbackFrequency() - { - return rollbackFrequency; - } - - @Override - public boolean isPrintHeaders() - { - return printHeaders; - } + private boolean isReportTotal = false; + + private boolean isReportHeader = true; + + private int sendEOS = 0; + + private int connectionCount = 1; + + private int rollbackFrequency = 0; + + private boolean printHeaders; + + private boolean printContent; + + private long ttl; + + private int priority; + + private String readyAddress; + + public JVMArgConfiguration() + { + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address",address); + + timeout = Long.getLong("timeout",0); + msg_size = Integer.getInteger("msg-size", 0); + cacheMessage = true; //Boolean.getBoolean("cache-msg"); + disableMessageID = Boolean.getBoolean("disable-message-id"); + disableTimestamp = Boolean.getBoolean("disable-timestamp"); + durable = Boolean.getBoolean("durable"); + transaction_size = Integer.getInteger("tx",1000); + ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg-count",msg_count); + warmup_count = Integer.getInteger("warmup-count",warmup_count); + random_msg_size = Boolean.getBoolean("random-msg-size"); + msgType = System.getProperty("msg-type","bytes"); + printStdDev = Boolean.getBoolean("print-std-dev"); + sendRate = Integer.getInteger("rate",0); + externalController = Boolean.getBoolean("ext-controller"); + useUniqueDest = Boolean.getBoolean("use-unique-dest"); + random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1); + reportEvery = Integer.getInteger("report-every",0); + isReportTotal = Boolean.getBoolean("report-total"); + isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header"); + sendEOS = Integer.getInteger("send-eos",1); + connectionCount = Integer.getInteger("con_count",1); + ackFrequency = Integer.getInteger("ack-frequency",100); + rollbackFrequency = Integer.getInteger("rollback-frequency",0); + printHeaders = Boolean.getBoolean("print-headers"); + printContent = Boolean.getBoolean("print-content"); + ttl = Long.getLong("ttl", 0); + priority = Integer.getInteger("priority", 0); + readyAddress = System.getProperty("ready-address"); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getUrl() + */ + @Override + public String getUrl() + { + return url; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getHost() + */ + @Override + public String getHost() + { + return host; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getPort() + */ + @Override + public int getPort() + { + return port; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAddress() + */ + @Override + public String getAddress() + { + return address; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getTimeout() + */ + @Override + public long getTimeout() + { + return timeout; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckMode() + */ + @Override + public int getAckMode() + { + return ack_mode; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgCount() + */ + @Override + public int getMsgCount() + { + return msg_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgSize() + */ + @Override + public int getMsgSize() + { + return msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom() + */ + @Override + public int getRandomMsgSizeStartFrom() + { + return random_msg_size_start_from; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDurable() + */ + @Override + public boolean isDurable() + { + return durable; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isTransacted() + */ + @Override + public boolean isTransacted() + { + return transaction_size > 0; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize() + */ + @Override + public int getTransactionSize() + { + return transaction_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount() + */ + @Override + public int getWarmupCount() + { + return warmup_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage() + */ + @Override + public boolean isCacheMessage() + { + return cacheMessage; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID() + */ + @Override + public boolean isDisableMessageID() + { + return disableMessageID; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp() + */ + @Override + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize() + */ + @Override + public boolean isRandomMsgSize() + { + return random_msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMessageType() + */ + @Override + public String getMessageType() + { + return msgType; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev() + */ + @Override + public boolean isPrintStdDev() + { + return printStdDev; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getSendRate() + */ + @Override + public int getSendRate() + { + return sendRate; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isExternalController() + */ + @Override + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests() + */ + @Override + public boolean isUseUniqueDests() + { + return useUniqueDest; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency() + */ + @Override + public int getAckFrequency() + { + return ackFrequency; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#createConnection() + */ + @Override + public Connection createConnection() throws Exception + { + if (getHost().equals("") || getPort() == -1) + { + return new AMQConnection(getUrl()); + } + else + { + return new AMQConnection(getHost(),getPort(),"guest","guest","test","test"); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat() + */ + @Override + public DecimalFormat getDecimalFormat() + { + return df; + } + + @Override + public int reportEvery() + { + return reportEvery; + } + + @Override + public boolean isReportTotal() + { + return isReportTotal; + } + + @Override + public boolean isReportHeader() + { + return isReportHeader; + } + + @Override + public int getSendEOS() + { + return sendEOS; + } + + @Override + public int getConnectionCount() + { + return connectionCount; + } + + @Override + public int getRollbackFrequency() + { + return rollbackFrequency; + } + + @Override + public boolean isPrintHeaders() + { + return printHeaders; + } + + @Override + public boolean isPrintContent() + { + return printContent; + } + + @Override + public long getTTL() + { + return ttl; + } + + @Override + public int getPriority() + { + return priority; + } + + @Override + public String getReadyAddress() + { + return readyAddress; + } } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java index 8ab1379fce..a0ba928e1f 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.tools; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.tools; * under the License. * */ +package org.apache.qpid.tools; import javax.jms.BytesMessage; diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java index 02f011f1b9..6dd8b7e1ca 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java @@ -28,10 +28,12 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.tools.TestConfiguration.MessageType; import org.apache.qpid.tools.report.BasicReporter; import org.apache.qpid.tools.report.Reporter; @@ -42,140 +44,164 @@ import org.slf4j.LoggerFactory; public class QpidReceive implements MessageListener { - private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); - private final CountDownLatch testCompleted = new CountDownLatch(1); - - private Connection con; - private Session session; - private Destination dest; - private MessageConsumer consumer; - private boolean transacted = false; - private boolean isRollback = false; - private int txSize = 0; - private int rollbackFrequency = 0; - private int ackFrequency = 0; - private int expected = 0; - private int received = 0; - private Reporter report; - private TestConfiguration config; - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) - { - this(report,config, con, dest, UUID.randomUUID().toString()); - } - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) - { - //System.out.println("Producer ID : " + id); - this.report = report; - this.config = config; - this.con = con; - this.dest = dest; - } - - public void setUp() throws Exception - { - if (config.isTransacted()) - { - session = con.createSession(true, Session.SESSION_TRANSACTED); - } - else if (config.getAckFrequency() > 0) - { - session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - } - else - { - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); - - transacted = config.isTransacted(); - txSize = config.getTransactionSize(); - isRollback = config.getRollbackFrequency() > 0; - rollbackFrequency = config.getRollbackFrequency(); - ackFrequency = config.getAckFrequency(); - } - - public void resetCounters() - { - received = 0; - expected = 0; - report.clear(); - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && - TestConfiguration.EOS.equals(((TextMessage)msg).getText())) - { - testCompleted.countDown(); - return; - } - - received++; - report.message(msg); - - if (transacted && (received % txSize == 0)) - { - if (isRollback && (received % rollbackFrequency == 0)) - { - session.rollback(); - } - else - { - session.commit(); - } - } - else if (ackFrequency > 0) - { - msg.acknowledge(); - } - - if (expected >= received) - { - testCompleted.countDown(); - } - - } - catch(Exception e) - { - _logger.error("Error when receiving messages",e); - } - - } - - public void waitforCompletion(int expected) throws Exception - { - this.expected = expected; - testCompleted.await(); - } - - public void tearDown() throws Exception - { - session.close(); - } - - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class, - System.out, - config.reportEvery(), - config.isReportHeader() - ); - Destination dest = AMQDestination.createDestination(config.getAddress()); - QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); - receiver.setUp(); - receiver.waitforCompletion(config.getMsgCount()); - if (config.isReportTotal()) - { - reporter.report(); - } - receiver.tearDown(); - } + private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class); + private final CountDownLatch testCompleted = new CountDownLatch(1); + + private Connection con; + private Session session; + private Destination dest; + private MessageConsumer consumer; + private boolean transacted = false; + private boolean isRollback = false; + private int txSize = 0; + private int rollbackFrequency = 0; + private int ackFrequency = 0; + private int expected = 0; + private int received = 0; + private Reporter report; + private TestConfiguration config; + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + con.start(); + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else if (config.getAckFrequency() > 0) + { + session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + if (_logger.isDebugEnabled()) + { + System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); + } + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + isRollback = config.getRollbackFrequency() > 0; + rollbackFrequency = config.getRollbackFrequency(); + ackFrequency = config.getAckFrequency(); + + _logger.debug("Ready address : " + config.getReadyAddress()); + if (config.getReadyAddress() != null) + { + MessageProducer prod = session.createProducer(AMQDestination + .createDestination(config.getReadyAddress())); + prod.send(session.createMessage()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending message to ready address " + prod.getDestination()); + } + } + } + + public void resetCounters() + { + received = 0; + expected = 0; + report.clear(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && + TestConfiguration.EOS.equals(((TextMessage)msg).getText())) + { + testCompleted.countDown(); + return; + } + + received++; + report.message(msg); + + if (config.isPrintHeaders()) + { + System.out.println(((AbstractJMSMessage)msg).toHeaderString()); + } + + if (config.isPrintContent()) + { + System.out.println(((AbstractJMSMessage)msg).toBodyString()); + } + + if (transacted && (received % txSize == 0)) + { + if (isRollback && (received % rollbackFrequency == 0)) + { + session.rollback(); + } + else + { + session.commit(); + } + } + else if (ackFrequency > 0) + { + msg.acknowledge(); + } + + if (received >= expected) + { + testCompleted.countDown(); + } + + } + catch(Exception e) + { + _logger.error("Error when receiving messages",e); + } + } + + public void waitforCompletion(int expected) throws Exception + { + this.expected = expected; + testCompleted.await(); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(ThroughputAndLatency.class, + System.out, + config.reportEvery(), + config.isReportHeader()); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); + receiver.setUp(); + receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS()); + if (config.isReportTotal()) + { + reporter.report(); + } + receiver.tearDown(); + } } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java index c058b83d41..3d321dcade 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java @@ -43,249 +43,261 @@ import org.slf4j.LoggerFactory; public class QpidSend { - private Connection con; - private Session session; - private Destination dest; - private MessageProducer producer; - private MessageType msgType; - private Message msg; - private Object payload; - private List<Object> payloads; - private boolean cacheMsg = false; - private boolean randomMsgSize = false; - private boolean durable = false; - private Random random; - private int msgSizeRange = 1024; - private int totalMsgCount = 0; - private boolean rateLimitProducer = false; - private boolean transacted = false; - private int txSize = 0; + private Connection con; + private Session session; + private Destination dest; + private MessageProducer producer; + private MessageType msgType; + private Message msg; + private Object payload; + private List<Object> payloads; + private boolean cacheMsg = false; + private boolean randomMsgSize = false; + private boolean durable = false; + private Random random; + private int msgSizeRange = 1024; + private int totalMsgCount = 0; + private boolean rateLimitProducer = false; + private boolean transacted = false; + private int txSize = 0; - private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); - Reporter report; - TestConfiguration config; + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + Reporter report; + TestConfiguration config; - public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) - { - this(report,config, con, dest, UUID.randomUUID().toString()); - } + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } - public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) - { - //System.out.println("Producer ID : " + id); - this.report = report; - this.config = config; - this.con = con; - this.dest = dest; - } + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } - public void setUp() throws Exception - { - if (config.isTransacted()) - { - session = con.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } + public void setUp() throws Exception + { + con.start(); + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } - durable = config.isDurable(); - rateLimitProducer = config.getSendRate() > 0 ? true : false; - if (_logger.isDebugEnabled() && rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); - } + durable = config.isDurable(); + rateLimitProducer = config.getSendRate() > 0 ? true : false; + if (_logger.isDebugEnabled() && rateLimitProducer) + { + _logger.debug("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); + } - transacted = config.isTransacted(); - txSize = config.getTransactionSize(); + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); - msgType = MessageType.getType(config.getMessageType()); - // if message caching is enabled we pre create the message - // else we pre create the payload - if (config.isCacheMessage()) - { - cacheMsg = true; - msg = createMessage(createPayload(config.getMsgSize())); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (config.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = config.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); + msgType = MessageType.getType(config.getMessageType()); + // if message caching is enabled we pre create the message + // else we pre create the payload + if (config.isCacheMessage()) + { + cacheMsg = true; + msg = createMessage(createPayload(config.getMsgSize())); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (config.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = config.getMsgSize(); + payloads = new ArrayList<Object>(msgSizeRange); - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(createPayload(i)); - } - } - else - { - payload = createPayload(config.getMsgSize()); - } + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(createPayload(i)); + } + } + else + { + payload = createPayload(config.getMsgSize()); + } - producer = session.createProducer(dest); - if (_logger.isDebugEnabled()) - { - System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); - } - producer.setDisableMessageID(config.isDisableMessageID()); - producer.setDisableMessageTimestamp(config.isDisableTimestamp()); - } + producer = session.createProducer(dest); + if (_logger.isDebugEnabled()) + { + _logger.debug("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); + } + producer.setDisableMessageID(config.isDisableMessageID()); + //we add a separate timestamp to allow interoperability with other clients. + producer.setDisableMessageTimestamp(true); + if (config.getTTL() > 0) + { + producer.setTimeToLive(config.getTTL()); + } + if (config.getPriority() > 0) + { + producer.setPriority(config.getPriority()); + } + } - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } - } + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } + } - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - Message m; + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + Message m; - if (!randomMsgSize) - { - m = createMessage(payload); - } - else - { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); - } - m.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return m; - } - } + if (!randomMsgSize) + { + m = createMessage(payload); + } + else + { + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + } + m.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return m; + } + } - public void commit() throws Exception - { - session.commit(); - } + public void commit() throws Exception + { + session.commit(); + } - public void send() throws Exception - { - send(config.getMsgCount()); - } + public void send() throws Exception + { + send(config.getMsgCount()); + } - public void send(int count) throws Exception - { - int sendRate = config.getSendRate(); - if (rateLimitProducer) - { - int iterations = count/sendRate; - int remainder = count%sendRate; - for (int i=0; i < iterations; i++) - { - long iterationStart = Clock.getTime(); - sendMessages(sendRate); - long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs(); - long diff = Clock.SEC - elapsed; - if (diff > 0) - { - // We have sent more messages in a sec than specified by the rate. - Thread.sleep(diff); - } - } - sendMessages(remainder); - } - else - { - sendMessages(count); - } - } + public void send(int count) throws Exception + { + int sendRate = config.getSendRate(); + if (rateLimitProducer) + { + int iterations = count/sendRate; + int remainder = count%sendRate; + for (int i=0; i < iterations; i++) + { + long iterationStart = System.currentTimeMillis(); + sendMessages(sendRate); + long elapsed = System.currentTimeMillis() - iterationStart; + long diff = Clock.SEC - elapsed; + if (diff > 0) + { + // We have sent more messages in a sec than specified by the rate. + Thread.sleep(diff); + } + } + sendMessages(remainder); + } + else + { + sendMessages(count); + } + } - private void sendMessages(int count) throws Exception - { - boolean isTimestamp = config.isReportLatency(); - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - if (isTimestamp) - { - msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime()); - } - producer.send(msg); - report.message(msg); - totalMsgCount++; + private void sendMessages(int count) throws Exception + { + boolean isTimestamp = !config.isDisableTimestamp(); + long s = System.currentTimeMillis(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + if (isTimestamp) + { + msg.setLongProperty(TestConfiguration.TIMESTAMP, System.currentTimeMillis()); + } + producer.send(msg); + //report.message(msg); + totalMsgCount++; - if ( transacted && ((totalMsgCount) % txSize == 0)) - { - session.commit(); - } - } - } + if ( transacted && ((totalMsgCount) % txSize == 0)) + { + session.commit(); + } + } + long e = System.currentTimeMillis() - s; + //System.out.println("Rate : " + totalMsgCount/e); + } - public void resetCounters() - { - totalMsgCount = 0; - report.clear(); - } + public void resetCounters() + { + totalMsgCount = 0; + report.clear(); + } - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty(TestConfiguration.EOS, true); - producer.send(msg); - } - - public void tearDown() throws Exception - { - session.close(); - } + public void sendEndMessage() throws Exception + { + Message msg = session.createTextMessage(TestConfiguration.EOS); + producer.send(msg); + } - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - Reporter reporter = new BasicReporter(Throughput.class, - System.out, - config.reportEvery(), - config.isReportHeader() - ); - Destination dest = AMQDestination.createDestination(config.getAddress()); - QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); - sender.setUp(); - sender.send(); - if (config.getSendEOS() > 0) - { - sender.sendEndMessage(); - } - if (config.isReportTotal()) - { - reporter.report(); - } - sender.tearDown(); - } + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); + sender.setUp(); + sender.send(); + if (config.getSendEOS() > 0) + { + sender.sendEndMessage(); + } + if (config.isReportTotal()) + { + reporter.report(); + } + sender.tearDown(); + } } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java index 7f7df0e5e6..18870bac59 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java @@ -26,20 +26,20 @@ import javax.jms.Connection; public interface TestConfiguration { - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) { return MAP; } @@ -47,80 +47,88 @@ public interface TestConfiguration { return OBJECT; }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + public final static String TIMESTAMP = "ts"; + + public final static String EOS = "eos"; + + public final static String SEQUENCE_NUMBER = "sn"; + + public String getUrl(); - public final static String TIMESTAMP = "ts"; + public String getHost(); - public final static String EOS = "eos"; + public int getPort(); - public final static String SEQUENCE_NUMBER = "sn"; + public String getAddress(); - public String getUrl(); + public long getTimeout(); - public String getHost(); + public int getAckMode(); - public int getPort(); + public int getMsgCount(); - public String getAddress(); + public int getMsgSize(); - public int getAckMode(); + public int getRandomMsgSizeStartFrom(); - public int getMsgCount(); + public boolean isDurable(); - public int getMsgSize(); + public boolean isTransacted(); - public int getRandomMsgSizeStartFrom(); + public int getTransactionSize(); - public boolean isDurable(); + public int getWarmupCount(); - public boolean isTransacted(); + public boolean isCacheMessage(); - public int getTransactionSize(); + public boolean isDisableMessageID(); - public int getWarmupCount(); + public boolean isDisableTimestamp(); - public boolean isCacheMessage(); + public boolean isRandomMsgSize(); - public boolean isDisableMessageID(); + public String getMessageType(); - public boolean isDisableTimestamp(); + public boolean isPrintStdDev(); - public boolean isRandomMsgSize(); + public int getSendRate(); - public String getMessageType(); + public boolean isExternalController(); - public boolean isPrintStdDev(); + public boolean isUseUniqueDests(); - public int getSendRate(); + public int getAckFrequency(); - public boolean isExternalController(); + public Connection createConnection() throws Exception; - public boolean isUseUniqueDests(); + public DecimalFormat getDecimalFormat(); - public int getAckFrequency(); + public int reportEvery(); - public Connection createConnection() throws Exception; + public boolean isReportTotal(); - public DecimalFormat getDecimalFormat(); + public boolean isReportHeader(); - public int reportEvery(); + public int getSendEOS(); - public boolean isReportTotal(); + public int getConnectionCount(); - public boolean isReportHeader(); + public int getRollbackFrequency(); - public boolean isReportLatency(); + public boolean isPrintHeaders(); - public int getSendEOS(); + public boolean isPrintContent(); - public int getConnectionCount(); + public long getTTL(); - public int getRollbackFrequency(); + public int getPriority(); - public boolean isPrintHeaders(); + public String getReadyAddress(); }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java index 73efd1f1e0..db8b4ddcee 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java @@ -25,115 +25,121 @@ import java.text.DecimalFormat; import javax.jms.Message; +import org.apache.qpid.tools.TestConfiguration; + public interface Statistics { - public void message(Message msg); - public void report(PrintStream out); - public void header(PrintStream out); - public void clear(); - - static class Throughput implements Statistics - { - DecimalFormat df = new DecimalFormat("###.##"); - int messages = 0; - long start = 0; - boolean started = false; - - @Override - public void message(Message msg) - { - ++messages; - if (!started) - { - start = System.currentTimeMillis(); - started = true; - } - } - - @Override - public void report(PrintStream out) - { - long elapsed = System.currentTimeMillis() - start; - out.print(df.format((double)messages/(double)elapsed)); - } - - @Override - public void header(PrintStream out) - { - out.print("tp(m/s)"); - } - - public void clear() - { - messages = 0; - start = 0; - started = false; - } - } - - static class ThroughputAndLatency extends Throughput - { - long minLatency = Long.MAX_VALUE; - long maxLatency = Long.MIN_VALUE; - double totalLatency = 0; - int sampleCount = 0; - - @Override - public void message(Message msg) - { - super.message(msg); - try - { - long ts = msg.getLongProperty("ts"); - long latency = System.currentTimeMillis() - ts; - minLatency = Math.min(latency, minLatency); - maxLatency = Math.min(latency, maxLatency); - totalLatency = totalLatency + latency; - sampleCount++; - } - catch(Exception e) - { - System.out.println("Error calculating latency"); - } - } - - @Override - public void report(PrintStream out) - { - super.report(out); - double avgLatency = totalLatency/(double)sampleCount; - out.append('\t') - .append(String.valueOf(minLatency)) - .append('\t') - .append(String.valueOf(maxLatency)) - .append('\t') - .append(df.format(avgLatency)); - - out.flush(); - } - - @Override - public void header(PrintStream out) - { - super.header(out); - out.append('\t') - .append("l-min") - .append('\t') - .append("l-max") - .append('\t') - .append("l-avg"); - - out.flush(); - } - - public void clear() - { - super.clear(); - minLatency = 0; - maxLatency = 0; - totalLatency = 0; - sampleCount = 0; - } - } + public void message(Message msg); + public void report(PrintStream out); + public void header(PrintStream out); + public void clear(); + + static class Throughput implements Statistics + { + DecimalFormat df = new DecimalFormat("###"); + int messages = 0; + long start = 0; + boolean started = false; + + @Override + public void message(Message msg) + { + ++messages; + if (!started) + { + start = System.currentTimeMillis(); + started = true; + } + } + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + out.println(df.format((double)messages/(double)elapsed)); + } + + @Override + public void header(PrintStream out) + { + out.println("tp(m/s)"); + } + + public void clear() + { + messages = 0; + start = 0; + started = false; + } + } + + static class ThroughputAndLatency extends Throughput + { + long minLatency = Long.MAX_VALUE; + long maxLatency = Long.MIN_VALUE; + double totalLatency = 0; + int sampleCount = 0; + + @Override + public void message(Message msg) + { + super.message(msg); + try + { + long ts = msg.getLongProperty(TestConfiguration.TIMESTAMP); + long latency = System.currentTimeMillis() - ts; + minLatency = Math.min(latency, minLatency); + maxLatency = Math.max(latency, maxLatency); + totalLatency = totalLatency + latency; + sampleCount++; + } + catch(Exception e) + { + System.out.println("Error calculating latency " + e); + } + } + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + double rate = (double)messages/(double)elapsed; + double avgLatency = totalLatency/(double)sampleCount; + out.append("\n") + .append(df.format(rate)) + .append('\t') + .append(String.valueOf(minLatency)) + .append('\t') + .append(String.valueOf(maxLatency)) + .append('\t') + .append(df.format(avgLatency)) + .append("\n"); + + out.flush(); + } + + @Override + public void header(PrintStream out) + { + out.append("tp(m/s)") + .append('\t') + .append("l-min") + .append('\t') + .append("l-max") + .append('\t') + .append("l-avg"); + + out.flush(); + } + + public void clear() + { + super.clear(); + minLatency = 0; + maxLatency = 0; + totalLatency = 0; + sampleCount = 0; + } + } } |