diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/tools/src/main | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/tools/src/main')
13 files changed, 3258 insertions, 0 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java new file mode 100644 index 0000000000..b10129d855 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; + +public abstract class Client implements ExceptionListener +{ + private Connection con; + private Session ssn; + private boolean durable = false; + private boolean transacted = false; + private int txSize = 10; + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + private String contentType = "application/octet-stream"; + + private long reportFrequency = 60000; // every min + + private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + private NumberFormat nf = new DecimalFormat("##.00"); + + private long startTime = System.currentTimeMillis(); + private ErrorHandler errorHandler = null; + + public Client(Connection con) throws Exception + { + this.con = con; + this.con.setExceptionListener(this); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + txSize = Integer.getInteger("tx_size",10); + contentType = System.getProperty("content_type","application/octet-stream"); + reportFrequency = Long.getLong("report_frequency", 60000); + } + + public void close() + { + try + { + con.close(); + } + catch (Exception e) + { + handleError("Error closing connection",e); + } + } + + public void onException(JMSException e) + { + handleError("Connection error",e); + } + + public void setErrorHandler(ErrorHandler h) + { + this.errorHandler = h; + } + + public void handleError(String msg,Exception e) + { + if (errorHandler != null) + { + errorHandler.handleError(msg, e); + } + else + { + System.err.println(msg); + e.printStackTrace(); + } + } + + protected Session getSsn() + { + return ssn; + } + + protected void setSsn(Session ssn) + { + this.ssn = ssn; + } + + protected boolean isDurable() + { + return durable; + } + + protected boolean isTransacted() + { + return transacted; + } + + protected int getTxSize() + { + return txSize; + } + + protected int getAck_mode() + { + return ack_mode; + } + + protected String getContentType() + { + return contentType; + } + + protected long getReportFrequency() + { + return reportFrequency; + } + + protected long getStartTime() + { + return startTime; + } + + protected void setStartTime(long startTime) + { + this.startTime = startTime; + } + + public DateFormat getDf() + { + return df; + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java new file mode 100644 index 0000000000..dbc73c404f --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java @@ -0,0 +1,27 @@ +package org.apache.qpid.testkit; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +public interface ErrorHandler { + + public void handleError(String msg,Exception e); +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java new file mode 100644 index 0000000000..b4294ee4cc --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +/** + * A generic receiver which consumes messages + * from a given address in a broker (host/port) + * until told to stop by killing it. + * + * It participates in a feedback loop to ensure the producer + * doesn't fill up the queue. If it receives an "End" msg + * it sends a reply to the replyTo address in that msg. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * sync_rcv - Whether to consume sync (instead of using a listener). + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. * + * check_for_dups - check for duplicate messages and out of order messages. + * jms_durable_sub - create a durable subscription instead of a regular subscription. + */ +public class Receiver extends Client implements MessageListener +{ + long msg_count = 0; + int sequence = 0; + boolean syncRcv = Boolean.getBoolean("sync_rcv"); + boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); + boolean checkForDups = Boolean.getBoolean("check_for_dups"); + MessageConsumer consumer; + List<Integer> duplicateMessages = new ArrayList<Integer>(); + + public Receiver(Connection con,String addr) throws Exception + { + super(con); + setSsn(con.createSession(isTransacted(), getAck_mode())); + consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); + if (!syncRcv) + { + consumer.setMessageListener(this); + } + + System.out.println("Receiving messages from : " + addr); + } + + public void onMessage(Message msg) + { + handleMessage(msg); + } + + public void run() throws Exception + { + long sleepTime = getReportFrequency(); + while(true) + { + if(syncRcv) + { + long t = sleepTime; + while (t > 0) + { + long start = System.currentTimeMillis(); + Message msg = consumer.receive(t); + t = t - (System.currentTimeMillis() - start); + handleMessage(msg); + } + } + Thread.sleep(sleepTime); + System.out.println(getDf().format(System.currentTimeMillis()) + + " - messages received : " + msg_count); + } + } + + private void handleMessage(Message m) + { + if (m == null) { return; } + + try + { + if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) + { + MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); + Message controlMsg = getSsn().createTextMessage(); + temp.send(controlMsg); + if (isTransacted()) + { + getSsn().commit(); + } + temp.close(); + } + else + { + + int seq = m.getIntProperty("sequence"); + if (checkForDups) + { + if (seq == 0) + { + sequence = 0; // wrap around for each iteration + System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); + duplicateMessages.clear(); + } + + if (seq < sequence) + { + duplicateMessages.add(seq); + } + else if (seq == sequence) + { + sequence++; + msg_count ++; + } + else + { + // Multiple publishers are not allowed in this test case. + // So out of order messages are not allowed. + throw new Exception(": Received an out of order message (expected=" + + sequence + ",received=" + seq + ")" ); + } + } + else + { + msg_count ++; + } + + // Please note that this test case doesn't expect duplicates + // When testing for transactions. + if (isTransacted() && msg_count % getTxSize() == 0) + { + getSsn().commit(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + handleError("Exception receiving messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + String addr = "message_queue"; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + if (args.length > 2) + { + addr = args[2]; + } + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + Receiver rcv = new Receiver(con,addr); + rcv.run(); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java new file mode 100644 index 0000000000..14b9b7302f --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.tools.MessageFactory; + +/** + * A generic sender which sends a stream of messages + * to a given address in a broker (host/port) + * until told to stop by killing it. + * + * It has a feedback loop to ensure it doesn't fill + * up queues due to a slow consumer. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * msg_size (256) + * msg_count (10) - # messages before waiting for feedback + * sleep_time (1000 ms) - sleep time btw each iteration + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. + */ +public class Sender extends Client +{ + protected int msg_size = 256; + protected int msg_count = 10; + protected int iterations = -1; + protected long sleep_time = 1000; + + protected Destination dest = null; + protected Destination replyTo = null; + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + + protected MessageProducer producer; + Random gen = new Random(19770905); + + public Sender(Connection con,String addr) throws Exception + { + super(con); + this.msg_size = Integer.getInteger("msg_size", 100); + this.msg_count = Integer.getInteger("msg_count", 10); + this.iterations = Integer.getInteger("iterations", -1); + this.sleep_time = Long.getLong("sleep_time", 1000); + this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); + this.dest = new AMQAnyDestination(addr); + this.producer = getSsn().createProducer(dest); + this.replyTo = getSsn().createTemporaryQueue(); + + System.out.println("Sending messages to : " + addr); + } + + /* + * If msg_size not specified it generates a message + * between 500-1500 bytes. + */ + protected Message getNextMessage() throws Exception + { + int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; + Message msg = (getContentType().equals("text/plain")) ? + MessageFactory.createTextMessage(getSsn(), s): + MessageFactory.createBytesMessage(getSsn(), s); + + msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT + : DeliveryMode.NON_PERSISTENT); + return msg; + } + + public void run() + { + try + { + boolean infinite = (iterations == -1); + for (int x=0; infinite || x < iterations; x++) + { + long now = System.currentTimeMillis(); + if (now - getStartTime() >= getReportFrequency()) + { + System.out.println(df.format(now) + " - iterations : " + x); + setStartTime(now); + } + + for (int i = 0; i < msg_count; i++) + { + Message msg = getNextMessage(); + msg.setIntProperty("sequence",i); + producer.send(msg); + if (isTransacted() && msg_count % getTxSize() == 0) + { + getSsn().commit(); + } + } + TextMessage m = getSsn().createTextMessage("End"); + m.setJMSReplyTo(replyTo); + producer.send(m); + + if (isTransacted()) + { + getSsn().commit(); + } + + MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); + feedbackConsumer.receive(); + feedbackConsumer.close(); + if (isTransacted()) + { + getSsn().commit(); + } + Thread.sleep(sleep_time); + } + } + catch (Exception e) + { + handleError("Exception sending messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + String addr = "message_queue"; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + if (args.length > 2) + { + addr = args[2]; + } + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + Sender sender = new Sender(con,addr); + sender.run(); + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java new file mode 100644 index 0000000000..72ca48e1c9 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -0,0 +1,370 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; + +/** + * A basic test case class that could launch a Sender/Receiver + * or both, each on it's own separate thread. + * + * If con_count == ssn_count, then each entity created will have + * it's own Connection. Else if con_count < ssn_count, then + * a connection will be shared by ssn_count/con_count # of entities. + * + * The if both sender and receiver options are set, it will + * share a connection. + * + * The following options are available as jvm args + * host, port + * con_count,ssn_count + * con_idle_time - which determines heartbeat + * sender, receiver - booleans which indicate which entity to create. + * Setting them both is also a valid option. + */ +public class TestLauncher implements ErrorHandler +{ + protected String host = "127.0.0.1"; + protected int port = 5672; + protected int sessions_per_con = 1; + protected int connection_count = 1; + protected long heartbeat = 5000; + protected boolean sender = false; + protected boolean receiver = false; + protected boolean useUniqueDests = false; + protected String url; + + protected String address = "my_queue; {create: always}"; + protected boolean durable = false; + protected String failover = ""; + protected AMQConnection controlCon; + protected Destination controlDest = null; + protected Session controlSession = null; + protected MessageProducer statusSender; + protected List<AMQConnection> clients = new ArrayList<AMQConnection>(); + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + protected String testName; + + public TestLauncher() + { + testName = System.getProperty("test_name","UNKNOWN"); + host = System.getProperty("host", "127.0.0.1"); + port = Integer.getInteger("port", 5672); + sessions_per_con = Integer.getInteger("ssn_per_con", 1); + connection_count = Integer.getInteger("con_count", 1); + heartbeat = Long.getLong("heartbeat", 5); + sender = Boolean.getBoolean("sender"); + receiver = Boolean.getBoolean("receiver"); + useUniqueDests = Boolean.getBoolean("use_unique_dests"); + + failover = System.getProperty("failover", ""); + durable = Boolean.getBoolean("durable"); + + url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; + + if (failover.equalsIgnoreCase("failover_exchange")) + { + url += "&failover='failover_exchange'"; + + System.out.println("Failover exchange " + url ); + } + + configureLogging(); + } + + protected void configureLogging() + { + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); + BasicConfigurator.configure(new ConsoleAppender(layout)); + + String logLevel = System.getProperty("log.level","warn"); + String logComponent = System.getProperty("log.comp","org.apache.qpid"); + + Logger logger = Logger.getLogger(logComponent); + logger.setLevel(Level.toLevel(logLevel, Level.WARN)); + + System.out.println("Level " + logger.getLevel()); + + } + + public void setUpControlChannel() + { + try + { + controlCon = new AMQConnection(url); + controlCon.start(); + + controlDest = new AMQAnyDestination("control; {create: always}"); // durable + + // Create the session to setup the messages + controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + statusSender = controlSession.createProducer(controlDest); + + } + catch (Exception e) + { + handleError("Error while setting up the test",e); + } + } + + public void cleanup() + { + try + { + controlSession.close(); + controlCon.close(); + for (AMQConnection con : clients) + { + con.close(); + } + } + catch (Exception e) + { + handleError("Error while tearing down the test",e); + } + } + + public void start(String addr) + { + try + { + if (addr == null) + { + addr = address; + } + + int ssn_per_con = sessions_per_con; + String addrTemp = addr; + for (int i = 0; i< connection_count; i++) + { + AMQConnection con = new AMQConnection(url); + con.start(); + clients.add(con); + for (int j = 0; j< ssn_per_con; j++) + { + String index = createPrefix(i,j); + if (useUniqueDests) + { + addrTemp = modifySubject(index,addr); + } + + if (sender) + { + createSender(index,con,addrTemp,this); + } + + if (receiver) + { + System.out.println("########## Creating receiver ##################"); + + createReceiver(index,con,addrTemp,this); + } + } + } + } + catch (Exception e) + { + handleError("Exception while setting up the test",e); + } + + } + + protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Receiver rcv = new Receiver(con,addr); + rcv.setErrorHandler(h); + rcv.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Receiver", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Receive thread",e); + } + + t.setName("ReceiverThread-" + index); + t.start(); + } + + protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Sender sender = new Sender(con, addr); + sender.setErrorHandler(h); + sender.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Sender", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Sender thread",e); + } + + t.setName("SenderThread-" + index); + t.start(); + } + + public synchronized void handleError(String msg,Exception e) + { + // In case sending the message fails + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" @ "); + sb.append(df.format(new Date(System.currentTimeMillis()))); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + + try + { + TextMessage errorMsg = controlSession.createTextMessage(); + errorMsg.setStringProperty("status", "error"); + errorMsg.setStringProperty("desc", msg); + errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); + errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); + + System.out.println("Msg " + errorMsg); + + statusSender.send(errorMsg); + } + catch (JMSException e1) + { + e1.printStackTrace(); + } + } + + private String serializeStackTrace(Exception e) + { + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + PrintStream printStream = new PrintStream(bOut); + e.printStackTrace(printStream); + printStream.close(); + return bOut.toString(); + } + + private String createPrefix(int i, int j) + { + return String.valueOf(i).concat(String.valueOf(j)); + } + + /** + * A basic helper function to modify the subjects by + * appending an index. + */ + private String modifySubject(String index,String addr) + { + if (addr.indexOf("/") > 0) + { + addr = addr.substring(0,addr.indexOf("/")+1) + + index + + addr.substring(addr.indexOf("/")+1,addr.length()); + } + else if (addr.indexOf(";") > 0) + { + addr = addr.substring(0,addr.indexOf(";")) + + "/" + index + + addr.substring(addr.indexOf(";"),addr.length()); + } + else + { + addr = addr + "/" + index; + } + + return addr; + } + + public static void main(String[] args) + { + final TestLauncher test = new TestLauncher(); + test.setUpControlChannel(); + System.out.println("args.length " + args.length); + System.out.println("args [0] " + args [0]); + test.start(args.length > 0 ? args [0] : null); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { test.cleanup(); } + }); + + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java new file mode 100644 index 0000000000..2390516ef0 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tools; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.jms.FailoverPolicy; + +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.util.Hashtable; +import java.util.Enumeration; +import java.util.List; +import java.util.LinkedList; +import java.io.IOException; +import java.io.File; +import java.io.FileInputStream; + +public class JNDICheck +{ + private static final String QUEUE = "queue."; + private static final String TOPIC = "topic."; + private static final String DESTINATION = "destination."; + private static final String CONNECTION_FACTORY = "connectionfactory."; + + public static void main(String[] args) + { + + if (args.length != 1) + { + usage(); + } + + String propertyFile = args[0]; + + new JNDICheck(propertyFile); + } + + private static void usage() + { + exit("Usage: JNDICheck <JNDI Config file>", 0); + } + + private static void exit(String message, int exitCode) + { + System.err.println(message); + System.exit(exitCode); + } + + private static String JAVA_NAMING = "java.naming.factory.initial"; + + Context _context = null; + Hashtable _environment = null; + + public JNDICheck(String propertyFile) + { + + // Load JNDI properties + Properties properties = new Properties(); + + try + { + properties.load(new FileInputStream(new File(propertyFile))); + } + catch (IOException e) + { + exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1); + } + + //Create the initial context + try + { + + System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING)); + + _context = new InitialContext(properties); + + _environment = _context.getEnvironment(); + + Enumeration keys = _environment.keys(); + + List<String> queues = new LinkedList<String>(); + List<String> topics = new LinkedList<String>(); + List<String> destinations = new LinkedList<String>(); + List<String> connectionFactories = new LinkedList<String>(); + + while (keys.hasMoreElements()) + { + String key = keys.nextElement().toString(); + + if (key.startsWith(QUEUE)) + { + queues.add(key); + } + else if (key.startsWith(TOPIC)) + { + topics.add(key); + } + else if (key.startsWith(DESTINATION)) + { + destinations.add(key); + } + else if (key.startsWith(CONNECTION_FACTORY)) + { + connectionFactories.add(key); + } + } + + printHeader(propertyFile); + printEntries(QUEUE, queues); + printEntries(TOPIC, topics); + printEntries(DESTINATION, destinations); + printEntries(CONNECTION_FACTORY, connectionFactories); + + } + catch (NamingException e) + { + exit("Unable to load JNDI Context due to:" + e.getMessage(), 1); + } + + } + + private void printHeader(String file) + { + print("JNDI file :" + file); + } + + private void printEntries(String type, List<String> list) + { + if (list.size() > 0) + { + String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1); + print(name + " elements in file:"); + printList(list); + print(""); + } + } + + private void printList(List<String> list) + { + for (String item : list) + { + String key = item.substring(item.indexOf('.') + 1); + + try + { + print(key, _context.lookup(key)); + } + catch (NamingException e) + { + exit("Error: item " + key + " no longer in context.", 1); + } + } + } + + private void print(String key, Object object) + { + if (object instanceof AMQDestination) + { + print(key + ":" + object); + } + else if (object instanceof AMQConnectionFactory) + { + AMQConnectionFactory factory = (AMQConnectionFactory) object; + print(key + ":Connection"); + print("ConnectionURL:"); + print(factory.getConnectionURL().toString()); + print("FailoverPolicy"); + print(new FailoverPolicy(factory.getConnectionURL(),null).toString()); + print(""); + } + } + + private void print(String msg) + { + System.out.println(msg); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java new file mode 100644 index 0000000000..b88b242e6d --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -0,0 +1,349 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.io.FileOutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.thread.Threading; + +/** + * Latency test sends an x number of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y number of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * It is important to have a sufficiently large number for the warmup count to + * ensure the system is in steady state before the test is started. + * + * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) + * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 + * + * The idea is to get a latency sample for the system once it achieves steady state. + * + */ + +public class LatencyTest extends PerfBase implements MessageListener +{ + MessageProducer producer; + MessageConsumer consumer; + Message msg; + byte[] payload; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + double stdDev = 0; + double avgLatency = 0; + boolean warmup_mode = true; + boolean transacted = false; + int transSize = 0; + + final List<Long> latencies; + final Lock lock = new ReentrantLock(); + final Condition warmedUp; + final Condition testCompleted; + + public LatencyTest() + { + super(); + warmedUp = lock.newCondition(); + testCompleted = lock.newCondition(); + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + latencies = new ArrayList <Long>(params.getMsgCount()); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(params.isDurable()? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (params.isCacheMessage()) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + ((BytesMessage)msg).writeBytes(payload); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + int count = params.getWarmupCount(); + for (int i=0; i < count; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + try + { + lock.lock(); + warmedUp.await(); + } + finally + { + lock.unlock(); + } + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + if (warmup_mode) + { + warmup_mode = false; + try + { + lock.lock(); + warmedUp.signal(); + } + finally + { + lock.unlock(); + } + } + else + { + computeStats(); + } + } + else if (!warmup_mode) + { + long time = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = time - msg.getJMSTimestamp(); + latencies.add(latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + private void computeStats() + { + avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double sigma = 0; + + for (long latency: latencies) + { + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + sigma = sigma + Math.pow(latency - avgLatency,2); + } + + stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); + + try + { + lock.lock(); + testCompleted.signal(); + } + finally + { + lock.unlock(); + } + } + + public void writeToFile() throws Exception + { + String fileName = System.getProperty("file"); + PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); + for (long latency: latencies) + { + writer.println(String.valueOf(latency)); + } + writer.flush(); + writer.close(); + } + + public void printToConsole() + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Standard Deviation : "). + append(df.format(stdDev)). + append(" ms").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % transSize == 0)) + { + session.commit(); + } + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + if (params.isTransacted()) + { + session.commit(); + } + } + + public void tearDown() throws Exception + { + try + { + lock.lock(); + testCompleted.await(); + } + finally + { + lock.unlock(); + } + + producer.close(); + consumer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + final LatencyTest latencyTest = new LatencyTest(); + Runnable r = new Runnable() + { + public void run() + { + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating latency test thread",e); + } + t.start(); + } +}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java new file mode 100644 index 0000000000..8ab1379fce --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java @@ -0,0 +1,64 @@ +package org.apache.qpid.tools; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class MessageFactory +{ + public static Message createBytesMessage(Session ssn, int size) throws JMSException + { + BytesMessage msg = ssn.createBytesMessage(); + msg.writeBytes(createMessagePayload(size).getBytes()); + return msg; + } + + public static Message createTextMessage(Session ssn, int size) throws JMSException + { + TextMessage msg = ssn.createTextMessage(); + msg.setText(createMessagePayload(size)); + return msg; + } + + public static String createMessagePayload(int size) + { + String msgData = "Qpid Test Message"; + + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count <= (size - msgData.length())) + { + buf.append(msgData); + count += msgData.length(); + } + if (count < size) + { + buf.append(msgData, 0, size - count); + } + + return buf.toString(); + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java new file mode 100644 index 0000000000..ac597d17de --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.text.DecimalFormat; +import java.util.Hashtable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +public class PerfBase +{ + TestParams params; + Connection con; + Session session; + Destination dest; + Destination feedbackDest; + DecimalFormat df = new DecimalFormat("###.##"); + + public PerfBase() + { + params = new TestParams(); + } + + public void setUp() throws Exception + { + + if (params.getHost().equals("") || params.getPort() == -1) + { + con = new AMQConnection(params.getUrl()); + } + else + { + con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); + } + con.start(); + session = con.createSession(params.isTransacted(), + params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); + + dest = new AMQAnyDestination(params.getAddress()); + } + + public void handleError(Exception e,String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + } +} + diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java new file mode 100644 index 0000000000..0ef0455a64 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -0,0 +1,267 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.thread.Threading; + +/** + * PerfConsumer will receive x no of messages in warmup mode. + * Once it receives the Start message it will then signal the PerfProducer. + * It will start recording stats from the first message it receives after + * the warmup mode is done. + * + * The following calculations are done. + * The important numbers to look at is + * a) Avg Latency + * b) System throughput. + * + * Latency. + * ========= + * Currently this test is written with the assumption that either + * a) The Perf Producer and Consumer are on the same machine + * b) They are on separate machines that have their time synced via a Time Server + * + * In order to calculate latency the producer inserts a timestamp + * hen the message is sent. The consumer will note the current time the message is + * received and will calculate the latency as follows + * latency = rcvdTime - msg.getJMSTimestamp() + * + * Through out the test it will keep track of the max and min latency to show the + * variance in latencies. + * + * Avg latency is measured by adding all latencies and dividing by the total msgs. + * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount + * + * Throughput + * =========== + * System throughput is calculated as follows + * rcvdMsgCount/(rcvdTime - testStartTime) + * + * Consumer rate is calculated as + * rcvdMsgCount/(rcvdTime - startTime) + * + * Note that the testStartTime referes to when the producer sent the first message + * and startTime is when the consumer first received a message. + * + * rcvdTime keeps track of when the last message is received. + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + */ + +public class PerfConsumer extends PerfBase implements MessageListener +{ + MessageConsumer consumer; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + long testStartTime = 0; // to measure system throughput + long startTime = 0; // to measure consumer throughput + long rcvdTime = 0; + boolean transacted = false; + int transSize = 0; + + final Object lock = new Object(); + + public PerfConsumer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + + boolean start = false; + while (!start) + { + Message msg = consumer.receive(); + if (msg instanceof TextMessage) + { + if (((TextMessage)msg).getText().equals("End")) + { + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) + { + session.commit(); + } + temp.close(); + } + } + } + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + consumer.setMessageListener(this); + } + + public void printResults() throws Exception + { + synchronized (lock) + { + lock.wait(); + } + + double avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; + double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Consumer rate : "). + append(df.format(consRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(throughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void notifyCompletion(Destination replyTo) throws Exception + { + MessageProducer tmp = session.createProducer(replyTo); + Message endMsg = session.createMessage(); + tmp.send(endMsg); + if (params.isTransacted()) + { + session.commit(); + } + tmp.close(); + } + + public void tearDown() throws Exception + { + consumer.close(); + session.close(); + con.close(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + notifyCompletion(msg.getJMSReplyTo()); + + synchronized (lock) + { + lock.notifyAll(); + } + } + else + { + rcvdTime = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (rcvdMsgCount == 1) + { + startTime = rcvdTime; + testStartTime = msg.getJMSTimestamp(); + } + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = rcvdTime - msg.getJMSTimestamp(); + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + printResults(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public static void main(String[] args) + { + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() + { + public void run() + { + cons.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + } +}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java new file mode 100644 index 0000000000..015d1e6205 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; + +import org.apache.qpid.thread.Threading; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + */ +public class PerfProducer extends PerfBase +{ + MessageProducer producer; + Message msg; + byte[] payload; + List<byte[]> payloads; + boolean cacheMsg = false; + boolean randomMsgSize = false; + boolean durable = false; + Random random; + int msgSizeRange = 1024; + + public PerfProducer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + feedbackDest = session.createTemporaryQueue(); + + durable = params.isDurable(); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + cacheMsg = true; + + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (params.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = params.getMsgSize(); + payloads = new ArrayList<byte[]>(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + } + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + + if (!randomMsgSize) + { + ((BytesMessage)msg).writeBytes(payload); + } + else + { + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + } + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); + + for (int i=0; i < params.getWarmupCount() -1; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + boolean transacted = params.isTransacted(); + int tranSize = params.getTransactionSize(); + + long start = System.currentTimeMillis(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % tranSize == 0)) + { + session.commit(); + } + } + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; + System.out.println(new StringBuilder("Producer rate: "). + append(df.format(rate)). + append(" msg/sec"). + toString()); + } + + public void waitForCompletion() throws Exception + { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); + } + + public void tearDown() throws Exception + { + producer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + waitForCompletion(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); + } +}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java new file mode 100644 index 0000000000..602fcc6321 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -0,0 +1,904 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import static org.apache.qpid.tools.QpidBench.Mode.BOTH; +import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; +import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; + +/** + * QpidBench + * + */ + +public class QpidBench +{ + + static enum Mode + { + PUBLISH, CONSUME, BOTH + } + + private static class Options + { + private StringBuilder usage = new StringBuilder("qpid-bench <options>"); + + void usage(String name, String description, Object def) + { + String defval = ""; + if (def != null) + { + defval = String.format(" (%s)", def); + } + usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); + } + + public String broker = "localhost"; + public int port = 5672; + public long count = 1000000; + public long window = 100000; + public long sample = window; + public int size = 1024; + public Mode mode = BOTH; + public boolean timestamp = false; + public boolean message_id = false; + public boolean message_cache = false; + public boolean persistent = false; + public boolean jms_publish = false; + public boolean jms_consume = false; + public boolean help = false; + + { + usage("-b, --broker", "the broker hostname", broker); + } + + public void parse__broker(String b) + { + this.broker = b; + } + + public void parse_b(String b) + { + parse__broker(b); + } + + { + usage("-p, --port", "the broker port", port); + } + + public void parse__port(String p) + { + this.port = Integer.parseInt(p); + } + + public void parse_p(String p) + { + parse__port(p); + } + + { + usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); + } + + public void parse__count(String c) + { + this.count = Long.parseLong(c); + } + + public void parse_c(String c) + { + parse__count(c); + } + + { + usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); + } + + public void parse__window(String w) + { + this.window = Long.parseLong(w); + } + + public void parse_w(String w) + { + parse__window(w); + } + + { + usage("--sample", "print stats after this many messages, 0 disables", sample); + } + + public void parse__sample(String s) + { + this.sample = Long.parseLong(s); + } + + { + usage("-i, --interval", "sets both --window and --sample", window); + } + + public void parse__interval(String i) + { + this.window = Long.parseLong(i); + this.sample = window; + } + + public void parse_i(String i) + { + parse__interval(i); + } + + { + usage("-s, --size", "the message size", size); + } + + public void parse__size(String s) + { + this.size = Integer.parseInt(s); + } + + public void parse_s(String s) + { + parse__size(s); + } + + { + usage("-m, --mode", "one of publish, consume, or both", mode); + } + + public void parse__mode(String m) + { + if (m.equalsIgnoreCase("publish")) + { + this.mode = PUBLISH; + } + else if (m.equalsIgnoreCase("consume")) + { + this.mode = CONSUME; + } + else if (m.equalsIgnoreCase("both")) + { + this.mode = BOTH; + } + else + { + throw new IllegalArgumentException + ("must be one of 'publish', 'consume', or 'both'"); + } + } + + public void parse_m(String m) + { + parse__mode(m); + } + + { + usage("--timestamp", "set timestamps on each message if true", timestamp); + } + + public void parse__timestamp(String t) + { + this.timestamp = Boolean.parseBoolean(t); + } + + { + usage("--mesage-id", "set the message-id on each message if true", message_id); + } + + public void parse__message_id(String m) + { + this.message_id = Boolean.parseBoolean(m); + } + + { + usage("--message-cache", "reuse the same message for each send if true", message_cache); + } + + public void parse__message_cache(String c) + { + this.message_cache = Boolean.parseBoolean(c); + } + + { + usage("--persistent", "set the delivery-mode to persistent if true", persistent); + } + + public void parse__persistent(String p) + { + this.persistent = Boolean.parseBoolean(p); + } + + { + usage("--jms-publish", "use the jms client for publish", jms_publish); + } + + public void parse__jms_publish(String jp) + { + this.jms_publish = Boolean.parseBoolean(jp); + } + + { + usage("--jms-consume", "use the jms client for consume", jms_consume); + } + + public void parse__jms_consume(String jc) + { + this.jms_consume = Boolean.parseBoolean(jc); + } + + { + usage("--jms", "sets both --jms-publish and --jms-consume", false); + } + + public void parse__jms(String j) + { + this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); + } + + { + usage("-h, --help", "prints this message", null); + } + + public void parse__help() + { + this.help = true; + } + + public void parse_h() + { + parse__help(); + } + + public String parse(String ... args) + { + Class klass = getClass(); + List<String> arguments = new ArrayList<String>(); + for (int i = 0; i < args.length; i++) + { + String option = args[i]; + + if (!option.startsWith("-")) + { + arguments.add(option); + continue; + } + + String method = "parse" + option.replace('-', '_'); + try + { + try + { + Method parser = klass.getMethod(method); + parser.invoke(this); + } + catch (NoSuchMethodException e) + { + try + { + Method parser = klass.getMethod(method, String.class); + + String value = null; + if (i + 1 < args.length) + { + value = args[i+1]; + i++; + } + else + { + return option + " requires a value"; + } + + parser.invoke(this, value); + } + catch (NoSuchMethodException e2) + { + return "no such option: " + option; + } + } + } + catch (InvocationTargetException e) + { + Throwable t = e.getCause(); + return String.format + ("error parsing %s: %s: %s", option, t.getClass().getName(), + t.getMessage()); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access parse method: " + option, e); + } + } + + return parseArguments(arguments); + } + + public String parseArguments(List<String> arguments) + { + if (arguments.size() > 0) + { + String args = arguments.toString(); + return "unrecognized arguments: " + args.substring(1, args.length() - 1); + } + else + { + return null; + } + } + + public String toString() + { + Class klass = getClass(); + Field[] fields = klass.getFields(); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < fields.length; i++) + { + if (i > 0) + { + str.append("\n"); + } + + String name = fields[i].getName(); + str.append(name); + str.append(" = "); + Object value; + try + { + value = fields[i].get(this); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access field: " + name, e); + } + str.append(value); + } + + return str.toString(); + } + } + + public static final void main(String[] args) throws Exception + { + final Options opts = new Options(); + String error = opts.parse(args); + if (error != null) + { + System.err.println(error); + System.exit(-1); + return; + } + + if (opts.help) + { + System.out.println(opts.usage); + return; + } + + System.out.println(opts); + + switch (opts.mode) + { + case CONSUME: + case BOTH: + Runnable r = new Runnable() + { + public void run() + { + try + { + if (opts.jms_consume) + { + jms_consumer(opts); + } + else + { + native_consumer(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + System.out.println("Consumer Completed"); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + break; + } + + switch (opts.mode) + { + case PUBLISH: + case BOTH: + Runnable r = new Runnable() + { + public void run() + { + try + { + if (opts.jms_publish) + { + jms_publisher(opts); + } + else + { + native_publisher(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + System.out.println("Producer Completed"); + } + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating publisher thread",e); + } + t.start(); + break; + } + } + + private static enum Column + { + LEFT, RIGHT + } + + private static final void sample(Options opts, Column col, String name, long count, + long start, long time, long lastTime) + { + String pfx = ""; + String sfx = ""; + if (opts.mode == BOTH) + { + if (col == Column.RIGHT) + { + pfx = " -- "; + } + else + { + sfx = " --"; + } + } + + if (count == 0) + { + String stats = String.format("%s: %tc", name, start); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + return; + } + + double cumulative = 1000 * (double) count / (double) (time - start); + double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); + + String stats = String.format + ("%s: %d %.2f %.2f", name, count, cumulative, interval); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + } + + private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception + { + String url = String.format + ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", + opts.broker, opts.port); + return new AMQConnection(url); + } + + private static final void jms_publisher(Options opts) throws Exception + { + javax.jms.Connection conn = getJMSConnection(opts); + + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageProducer prod = ssn.createProducer(dest); + MessageConsumer cons = ssn.createConsumer(echo_dest); + prod.setDisableMessageID(!opts.message_id); + prod.setDisableMessageTimestamp(!opts.timestamp); + prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + StringBuilder str = new StringBuilder(); + for (int i = 0; i < opts.size; i++) + { + str.append((char) (i % 128)); + } + + String body = str.toString(); + + TextMessage cached = ssn.createTextMessage(); + cached.setText(body); + + conn.start(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + Message echo = cons.receive(); + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "JP", count, start, time, lastTime); + lastTime = time; + } + + TextMessage m; + if (opts.message_cache) + { + m = cached; + } + else + { + m = ssn.createTextMessage(); + m.setText(body); + } + + prod.send(m); + count++; + } + + conn.close(); + } + + private static final void jms_consumer(final Options opts) throws Exception + { + final javax.jms.Connection conn = getJMSConnection(opts); + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageConsumer cons = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(echo_dest); + prod.setDisableMessageID(true); + prod.setDisableMessageTimestamp(true); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + final TextMessage echo = ssn.createTextMessage(); + echo.setText("ECHO"); + + final Object done = new Object(); + cons.setMessageListener(new MessageListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void onMessage(Message m) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + try + { + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + prod.send(echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); + lastTime = time; + } + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + conn.start(); + synchronized (done) + { + done.wait(); + } + conn.close(); + } + + private static final org.apache.qpid.transport.Connection getConnection + (Options opts) + { + org.apache.qpid.transport.Connection conn = + new org.apache.qpid.transport.Connection(); + conn.connect(opts.broker, opts.port, null, "guest", "guest",false); + return conn; + } + + private static abstract class NativeListener implements SessionListener + { + + public void opened(org.apache.qpid.transport.Session ssn) {} + + public void resumed(org.apache.qpid.transport.Session ssn) {} + + public void exception(org.apache.qpid.transport.Session ssn, + SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(org.apache.qpid.transport.Session ssn) {} + + } + + private static final void native_publisher(Options opts) throws Exception + { + final long[] echos = { 0 }; + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(xfr); + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + MessageProperties cached_mp = new MessageProperties(); + DeliveryProperties cached_dp = new DeliveryProperties(); + cached_dp.setRoutingKey("test-queue"); + cached_dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + int size = opts.size; + ByteBuffer body = ByteBuffer.allocate(size); + for (int i = 0; i < size; i++) + { + body.put((byte) i); + } + body.flip(); + + ssn.invoke(new MessageSubscribe() + .queue("echo-queue") + .destination("echo-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + UUIDGen gen = UUIDs.newGenerator(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + synchronized (echos) + { + while (echos[0] < (count/opts.window)) + { + echos.wait(); + } + } + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "NP", count, start, time, lastTime); + lastTime = time; + } + + MessageProperties mp; + DeliveryProperties dp; + if (opts.message_cache) + { + mp = cached_mp; + dp = cached_dp; + } + else + { + mp = new MessageProperties(); + dp = new DeliveryProperties(); + dp.setRoutingKey("test-queue"); + dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + } + + if (opts.message_id) + { + mp.setMessageId(gen.generate()); + } + + if (opts.timestamp) + { + dp.setTimestamp(System.currentTimeMillis()); + } + + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), body.slice()); + count++; + } + + ssn.messageCancel("echo-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + + private static final void native_consumer(final Options opts) throws Exception + { + final DeliveryProperties dp = new DeliveryProperties(); + final byte[] echo = new byte[0]; + dp.setRoutingKey("echo-queue"); + dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); + final MessageProperties mp = new MessageProperties(); + final Object done = new Object(); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(xfr); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + ssn.invoke(new MessageSubscribe() + .queue("test-queue") + .destination("test-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + synchronized (done) + { + done.wait(); + } + + ssn.messageCancel("test-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java new file mode 100644 index 0000000000..89d6462a39 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import javax.jms.Session; + +public class TestParams +{ + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + + private String host = ""; + + private int port = -1; + + private String address = "queue; {create : always}"; + + private int msg_size = 1024; + + private int msg_type = 1; // not used yet + + private boolean cacheMessage = false; + + private boolean disableMessageID = false; + + private boolean disableTimestamp = false; + + private boolean durable = false; + + private boolean transacted = false; + + private int transaction_size = 1000; + + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + + private int msg_count = 10; + + private int warmup_count = 1; + + private boolean random_msg_size = false; + + public TestParams() + { + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address","queue"); + + msg_size = Integer.getInteger("msg_size", 1024); + msg_type = Integer.getInteger("msg_type",1); + cacheMessage = Boolean.getBoolean("cache_msg"); + disableMessageID = Boolean.getBoolean("disableMessageID"); + disableTimestamp = Boolean.getBoolean("disableTimestamp"); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + transaction_size = Integer.getInteger("trans_size",1000); + ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg_count",msg_count); + warmup_count = Integer.getInteger("warmup_count",warmup_count); + random_msg_size = Boolean.getBoolean("random_msg_size"); + } + + public String getUrl() + { + return url; + } + + public String getHost() + { + return host; + } + + public int getPort() + { + return port; + } + + public String getAddress() + { + return address; + } + + public int getAckMode() + { + return ack_mode; + } + + public int getMsgCount() + { + return msg_count; + } + + public int getMsgSize() + { + return msg_size; + } + + public int getMsgType() + { + return msg_type; + } + + public boolean isDurable() + { + return durable; + } + + public boolean isTransacted() + { + return transacted; + } + + public int getTransactionSize() + { + return transaction_size; + } + + public int getWarmupCount() + { + return warmup_count; + } + + public boolean isCacheMessage() + { + return cacheMessage; + } + + public boolean isDisableMessageID() + { + return disableMessageID; + } + + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + public boolean isRandomMsgSize() + { + return random_msg_size; + } + +} |