summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-04-10 11:17:14 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-04-10 11:17:14 +0000
commita5f02cb6972f84dc116db36bc73c08bdf31d0312 (patch)
treec7a1176275ddb4ebca5e90fefa8fa370bdcc6675
parentd981c9ac0638fb04c5594ebbe35852e44a70e73a (diff)
downloadqpid-python-a5f02cb6972f84dc116db36bc73c08bdf31d0312.tar.gz
QPID-3941 Refactored existing code in PerfConsumer and PerfProducer to
extract QpidSend and QpidReceive. These classes closely follow the C++ qpid-send.cpp and qpid-receive.cpp More work needs to be done to make them identical. This is still WIP. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1311676 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java181
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java291
2 files changed, 472 insertions, 0 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
new file mode 100644
index 0000000000..02f011f1b9
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.tools.TestConfiguration.MessageType;
+import org.apache.qpid.tools.report.BasicReporter;
+import org.apache.qpid.tools.report.Reporter;
+import org.apache.qpid.tools.report.Statistics.Throughput;
+import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QpidReceive implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
+ private final CountDownLatch testCompleted = new CountDownLatch(1);
+
+ private Connection con;
+ private Session session;
+ private Destination dest;
+ private MessageConsumer consumer;
+ private boolean transacted = false;
+ private boolean isRollback = false;
+ private int txSize = 0;
+ private int rollbackFrequency = 0;
+ private int ackFrequency = 0;
+ private int expected = 0;
+ private int received = 0;
+ private Reporter report;
+ private TestConfiguration config;
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
+ {
+ this(report,config, con, dest, UUID.randomUUID().toString());
+ }
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
+ {
+ //System.out.println("Producer ID : " + id);
+ this.report = report;
+ this.config = config;
+ this.con = con;
+ this.dest = dest;
+ }
+
+ public void setUp() throws Exception
+ {
+ if (config.isTransacted())
+ {
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else if (config.getAckFrequency() > 0)
+ {
+ session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ }
+ else
+ {
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+ System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
+
+ transacted = config.isTransacted();
+ txSize = config.getTransactionSize();
+ isRollback = config.getRollbackFrequency() > 0;
+ rollbackFrequency = config.getRollbackFrequency();
+ ackFrequency = config.getAckFrequency();
+ }
+
+ public void resetCounters()
+ {
+ received = 0;
+ expected = 0;
+ report.clear();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage &&
+ TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
+ {
+ testCompleted.countDown();
+ return;
+ }
+
+ received++;
+ report.message(msg);
+
+ if (transacted && (received % txSize == 0))
+ {
+ if (isRollback && (received % rollbackFrequency == 0))
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ else if (ackFrequency > 0)
+ {
+ msg.acknowledge();
+ }
+
+ if (expected >= received)
+ {
+ testCompleted.countDown();
+ }
+
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error when receiving messages",e);
+ }
+
+ }
+
+ public void waitforCompletion(int expected) throws Exception
+ {
+ this.expected = expected;
+ testCompleted.await();
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class,
+ System.out,
+ config.reportEvery(),
+ config.isReportHeader()
+ );
+ Destination dest = AMQDestination.createDestination(config.getAddress());
+ QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
+ receiver.setUp();
+ receiver.waitforCompletion(config.getMsgCount());
+ if (config.isReportTotal())
+ {
+ reporter.report();
+ }
+ receiver.tearDown();
+ }
+
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
new file mode 100644
index 0000000000..c058b83d41
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
@@ -0,0 +1,291 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.tools.TestConfiguration.MessageType;
+import org.apache.qpid.tools.report.BasicReporter;
+import org.apache.qpid.tools.report.Reporter;
+import org.apache.qpid.tools.report.Statistics.Throughput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QpidSend
+{
+ private Connection con;
+ private Session session;
+ private Destination dest;
+ private MessageProducer producer;
+ private MessageType msgType;
+ private Message msg;
+ private Object payload;
+ private List<Object> payloads;
+ private boolean cacheMsg = false;
+ private boolean randomMsgSize = false;
+ private boolean durable = false;
+ private Random random;
+ private int msgSizeRange = 1024;
+ private int totalMsgCount = 0;
+ private boolean rateLimitProducer = false;
+ private boolean transacted = false;
+ private int txSize = 0;
+
+ private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
+ Reporter report;
+ TestConfiguration config;
+
+ public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest)
+ {
+ this(report,config, con, dest, UUID.randomUUID().toString());
+ }
+
+ public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
+ {
+ //System.out.println("Producer ID : " + id);
+ this.report = report;
+ this.config = config;
+ this.con = con;
+ this.dest = dest;
+ }
+
+ public void setUp() throws Exception
+ {
+ if (config.isTransacted())
+ {
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ durable = config.isDurable();
+ rateLimitProducer = config.getSendRate() > 0 ? true : false;
+ if (_logger.isDebugEnabled() && rateLimitProducer)
+ {
+ System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec");
+ }
+
+ transacted = config.isTransacted();
+ txSize = config.getTransactionSize();
+
+ msgType = MessageType.getType(config.getMessageType());
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (config.isCacheMessage())
+ {
+ cacheMsg = true;
+ msg = createMessage(createPayload(config.getMsgSize()));
+ msg.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else if (config.isRandomMsgSize())
+ {
+ random = new Random(20080921);
+ randomMsgSize = true;
+ msgSizeRange = config.getMsgSize();
+ payloads = new ArrayList<Object>(msgSizeRange);
+
+ for (int i=0; i < msgSizeRange; i++)
+ {
+ payloads.add(createPayload(i));
+ }
+ }
+ else
+ {
+ payload = createPayload(config.getMsgSize());
+ }
+
+ producer = session.createProducer(dest);
+ if (_logger.isDebugEnabled())
+ {
+ System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName());
+ }
+ producer.setDisableMessageID(config.isDisableMessageID());
+ producer.setDisableMessageTimestamp(config.isDisableTimestamp());
+ }
+
+ Object createPayload(int size)
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return MessageFactory.createMessagePayload(size);
+ }
+ else
+ {
+ return MessageFactory.createMessagePayload(size).getBytes();
+ }
+ }
+
+ Message createMessage(Object payload) throws Exception
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return session.createTextMessage((String)payload);
+ }
+ else
+ {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes((byte[])payload);
+ return m;
+ }
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (cacheMsg)
+ {
+ return msg;
+ }
+ else
+ {
+ Message m;
+
+ if (!randomMsgSize)
+ {
+ m = createMessage(payload);
+ }
+ else
+ {
+ m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
+ }
+ m.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ return m;
+ }
+ }
+
+ public void commit() throws Exception
+ {
+ session.commit();
+ }
+
+ public void send() throws Exception
+ {
+ send(config.getMsgCount());
+ }
+
+ public void send(int count) throws Exception
+ {
+ int sendRate = config.getSendRate();
+ if (rateLimitProducer)
+ {
+ int iterations = count/sendRate;
+ int remainder = count%sendRate;
+ for (int i=0; i < iterations; i++)
+ {
+ long iterationStart = Clock.getTime();
+ sendMessages(sendRate);
+ long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs();
+ long diff = Clock.SEC - elapsed;
+ if (diff > 0)
+ {
+ // We have sent more messages in a sec than specified by the rate.
+ Thread.sleep(diff);
+ }
+ }
+ sendMessages(remainder);
+ }
+ else
+ {
+ sendMessages(count);
+ }
+ }
+
+ private void sendMessages(int count) throws Exception
+ {
+ boolean isTimestamp = config.isReportLatency();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ if (isTimestamp)
+ {
+ msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime());
+ }
+ producer.send(msg);
+ report.message(msg);
+ totalMsgCount++;
+
+ if ( transacted && ((totalMsgCount) % txSize == 0))
+ {
+ session.commit();
+ }
+ }
+ }
+
+ public void resetCounters()
+ {
+ totalMsgCount = 0;
+ report.clear();
+ }
+
+ public void sendEndMessage() throws Exception
+ {
+ Message msg = session.createMessage();
+ msg.setBooleanProperty(TestConfiguration.EOS, true);
+ producer.send(msg);
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ Reporter reporter = new BasicReporter(Throughput.class,
+ System.out,
+ config.reportEvery(),
+ config.isReportHeader()
+ );
+ Destination dest = AMQDestination.createDestination(config.getAddress());
+ QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
+ sender.setUp();
+ sender.send();
+ if (config.getSendEOS() > 0)
+ {
+ sender.sendEndMessage();
+ }
+ if (config.isReportTotal())
+ {
+ reporter.report();
+ }
+ sender.tearDown();
+ }
+}