diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-04-10 11:17:14 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-04-10 11:17:14 +0000 |
commit | a5f02cb6972f84dc116db36bc73c08bdf31d0312 (patch) | |
tree | c7a1176275ddb4ebca5e90fefa8fa370bdcc6675 | |
parent | d981c9ac0638fb04c5594ebbe35852e44a70e73a (diff) | |
download | qpid-python-a5f02cb6972f84dc116db36bc73c08bdf31d0312.tar.gz |
QPID-3941 Refactored existing code in PerfConsumer and PerfProducer to
extract QpidSend and QpidReceive. These classes closely follow the C++
qpid-send.cpp and qpid-receive.cpp
More work needs to be done to make them identical. This is still WIP.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1311676 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java | 181 | ||||
-rw-r--r-- | qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java | 291 |
2 files changed, 472 insertions, 0 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java new file mode 100644 index 0000000000..02f011f1b9 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.apache.qpid.tools.report.BasicReporter; +import org.apache.qpid.tools.report.Reporter; +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidReceive implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + private final CountDownLatch testCompleted = new CountDownLatch(1); + + private Connection con; + private Session session; + private Destination dest; + private MessageConsumer consumer; + private boolean transacted = false; + private boolean isRollback = false; + private int txSize = 0; + private int rollbackFrequency = 0; + private int ackFrequency = 0; + private int expected = 0; + private int received = 0; + private Reporter report; + private TestConfiguration config; + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else if (config.getAckFrequency() > 0) + { + session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + isRollback = config.getRollbackFrequency() > 0; + rollbackFrequency = config.getRollbackFrequency(); + ackFrequency = config.getAckFrequency(); + } + + public void resetCounters() + { + received = 0; + expected = 0; + report.clear(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && + TestConfiguration.EOS.equals(((TextMessage)msg).getText())) + { + testCompleted.countDown(); + return; + } + + received++; + report.message(msg); + + if (transacted && (received % txSize == 0)) + { + if (isRollback && (received % rollbackFrequency == 0)) + { + session.rollback(); + } + else + { + session.commit(); + } + } + else if (ackFrequency > 0) + { + msg.acknowledge(); + } + + if (expected >= received) + { + testCompleted.countDown(); + } + + } + catch(Exception e) + { + _logger.error("Error when receiving messages",e); + } + + } + + public void waitforCompletion(int expected) throws Exception + { + this.expected = expected; + testCompleted.await(); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); + receiver.setUp(); + receiver.waitforCompletion(config.getMsgCount()); + if (config.isReportTotal()) + { + reporter.report(); + } + receiver.tearDown(); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java new file mode 100644 index 0000000000..c058b83d41 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.apache.qpid.tools.report.BasicReporter; +import org.apache.qpid.tools.report.Reporter; +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidSend +{ + private Connection con; + private Session session; + private Destination dest; + private MessageProducer producer; + private MessageType msgType; + private Message msg; + private Object payload; + private List<Object> payloads; + private boolean cacheMsg = false; + private boolean randomMsgSize = false; + private boolean durable = false; + private Random random; + private int msgSizeRange = 1024; + private int totalMsgCount = 0; + private boolean rateLimitProducer = false; + private boolean transacted = false; + private int txSize = 0; + + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + Reporter report; + TestConfiguration config; + + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + durable = config.isDurable(); + rateLimitProducer = config.getSendRate() > 0 ? true : false; + if (_logger.isDebugEnabled() && rateLimitProducer) + { + System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); + } + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + + msgType = MessageType.getType(config.getMessageType()); + // if message caching is enabled we pre create the message + // else we pre create the payload + if (config.isCacheMessage()) + { + cacheMsg = true; + msg = createMessage(createPayload(config.getMsgSize())); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (config.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = config.getMsgSize(); + payloads = new ArrayList<Object>(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(createPayload(i)); + } + } + else + { + payload = createPayload(config.getMsgSize()); + } + + producer = session.createProducer(dest); + if (_logger.isDebugEnabled()) + { + System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); + } + producer.setDisableMessageID(config.isDisableMessageID()); + producer.setDisableMessageTimestamp(config.isDisableTimestamp()); + } + + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } + + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + Message m; + + if (!randomMsgSize) + { + m = createMessage(payload); + } + else + { + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + } + m.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return m; + } + } + + public void commit() throws Exception + { + session.commit(); + } + + public void send() throws Exception + { + send(config.getMsgCount()); + } + + public void send(int count) throws Exception + { + int sendRate = config.getSendRate(); + if (rateLimitProducer) + { + int iterations = count/sendRate; + int remainder = count%sendRate; + for (int i=0; i < iterations; i++) + { + long iterationStart = Clock.getTime(); + sendMessages(sendRate); + long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs(); + long diff = Clock.SEC - elapsed; + if (diff > 0) + { + // We have sent more messages in a sec than specified by the rate. + Thread.sleep(diff); + } + } + sendMessages(remainder); + } + else + { + sendMessages(count); + } + } + + private void sendMessages(int count) throws Exception + { + boolean isTimestamp = config.isReportLatency(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + if (isTimestamp) + { + msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime()); + } + producer.send(msg); + report.message(msg); + totalMsgCount++; + + if ( transacted && ((totalMsgCount) % txSize == 0)) + { + session.commit(); + } + } + } + + public void resetCounters() + { + totalMsgCount = 0; + report.clear(); + } + + public void sendEndMessage() throws Exception + { + Message msg = session.createMessage(); + msg.setBooleanProperty(TestConfiguration.EOS, true); + producer.send(msg); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); + sender.setUp(); + sender.send(); + if (config.getSendEOS() > 0) + { + sender.sendEndMessage(); + } + if (config.isReportTotal()) + { + reporter.report(); + } + sender.tearDown(); + } +} |