diff options
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java')
-rw-r--r-- | qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java | 101 |
1 files changed, 80 insertions, 21 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index 015d1e6205..a4b2982275 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -56,14 +56,44 @@ public class PerfProducer extends PerfBase { MessageProducer producer; Message msg; - byte[] payload; - List<byte[]> payloads; + Object payload; + List<Object> payloads; boolean cacheMsg = false; boolean randomMsgSize = false; boolean durable = false; Random random; int msgSizeRange = 1024; - + + enum MessageType { + BYTE, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("byte".equalsIgnoreCase(s)) + { + return BYTE; + } + /*else if ("map".equalsIgnoreCase(s)) + { + return MAP; + } + else if ("object".equalsIgnoreCase(s)) + { + return OBJECT; + }*/ + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + MessageType msgType = MessageType.BYTE; + public PerfProducer() { super(); @@ -75,14 +105,16 @@ public class PerfProducer extends PerfBase feedbackDest = session.createTemporaryQueue(); durable = params.isDurable(); - + msgType = MessageType.getType(params.getMessageType()); + + System.out.println("Using " + msgType + " messages"); + // if message caching is enabled we pre create the message // else we pre create the payload if (params.isCacheMessage()) { cacheMsg = true; - - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg = createMessage(createPayload(params.getMsgSize())); msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT @@ -93,16 +125,16 @@ public class PerfProducer extends PerfBase random = new Random(20080921); randomMsgSize = true; msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<byte[]>(msgSizeRange); - + payloads = new ArrayList<Object>(msgSizeRange); + for (int i=0; i < msgSizeRange; i++) { - payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + payloads.add(createPayload(i)); } - } + } else { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + payload = createPayload(params.getMsgSize()); } producer = session.createProducer(dest); @@ -110,6 +142,33 @@ public class PerfProducer extends PerfBase producer.setDisableMessageTimestamp(params.isDisableTimestamp()); } + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } + + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } + } + + protected Message getNextMessage() throws Exception { if (cacheMsg) @@ -117,22 +176,22 @@ public class PerfProducer extends PerfBase return msg; } else - { - msg = session.createBytesMessage(); - + { + Message m; + if (!randomMsgSize) { - ((BytesMessage)msg).writeBytes(payload); + m = createMessage(payload); } else { - ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); } - msg.setJMSDeliveryMode(durable? + m.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT ); - return msg; + return m; } } @@ -247,16 +306,16 @@ public class PerfProducer extends PerfBase prod.test(); } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating producer thread",e); } - t.start(); + t.start(); } }
\ No newline at end of file |