/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.testkit; import java.text.DateFormat; import java.text.DecimalFormat; import java.text.NumberFormat; import java.text.SimpleDateFormat; import java.util.Random; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.tools.MessageFactory; /** * A generic sender which sends a stream of messages * to a given address in a broker (host/port) * until told to stop by killing it. * * It has a feedback loop to ensure it doesn't fill * up queues due to a slow consumer. * * It doesn't check for correctness or measure anything * leaving those concerns to another entity. * However it prints a timestamp every x secs(-Dreport_frequency) * as checkpoint to figure out how far the test has progressed if * a failure occurred. * * It also takes in an optional Error handler to * pass out any error in addition to writing them to std err. * * This is intended more as building block to create * more complex test cases. However there is a main method * provided to use this standalone. * * The following options are available and configurable * via jvm args. * * msg_size (256) * msg_count (10) - # messages before waiting for feedback * sleep_time (1000 ms) - sleep time btw each iteration * report_frequency - how often a timestamp is printed * durable * transacted * tx_size - size of transaction batch in # msgs. */ public class Sender extends Client { protected int msg_size = 256; protected int msg_count = 10; protected int iterations = -1; protected long sleep_time = 1000; protected Destination dest = null; protected Destination replyTo = null; protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); protected NumberFormat nf = new DecimalFormat("##.00"); protected MessageProducer producer; Random gen = new Random(19770905); public Sender(Connection con,String addr) throws Exception { super(con); this.msg_size = Integer.getInteger("msg_size", 100); this.msg_count = Integer.getInteger("msg_count", 10); this.iterations = Integer.getInteger("iterations", -1); this.sleep_time = Long.getLong("sleep_time", 1000); this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); this.dest = new AMQAnyDestination(addr); this.producer = getSsn().createProducer(dest); this.replyTo = getSsn().createTemporaryQueue(); System.out.println("Sending messages to : " + addr); } /* * If msg_size not specified it generates a message * between 500-1500 bytes. */ protected Message getNextMessage() throws Exception { int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; Message msg = (getContentType().equals("text/plain")) ? MessageFactory.createTextMessage(getSsn(), s): MessageFactory.createBytesMessage(getSsn(), s); msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); return msg; } public void run() { try { boolean infinite = (iterations == -1); for (int x=0; infinite || x < iterations; x++) { long now = System.currentTimeMillis(); if (now - getStartTime() >= getReportFrequency()) { System.out.println(df.format(now) + " - iterations : " + x); setStartTime(now); } for (int i = 0; i < msg_count; i++) { Message msg = getNextMessage(); msg.setIntProperty("sequence",i); producer.send(msg); if (isTransacted() && msg_count % getTxSize() == 0) { getSsn().commit(); } } TextMessage m = getSsn().createTextMessage("End"); m.setJMSReplyTo(replyTo); producer.send(m); if (isTransacted()) { getSsn().commit(); } MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); feedbackConsumer.receive(); feedbackConsumer.close(); if (isTransacted()) { getSsn().commit(); } Thread.sleep(sleep_time); } } catch (Exception e) { handleError("Exception sending messages",e); } } // Receiver host port address public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 5672; String addr = "message_queue"; if (args.length > 0) { host = args[0]; } if (args.length > 1) { port = Integer.parseInt(args[1]); } if (args.length > 2) { addr = args[2]; } AMQConnection con = new AMQConnection( "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"); Sender sender = new Sender(con,addr); sender.run(); } }