summaryrefslogtreecommitdiff
path: root/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java')
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java101
1 files changed, 80 insertions, 21 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index 015d1e6205..a4b2982275 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -56,14 +56,44 @@ public class PerfProducer extends PerfBase
{
MessageProducer producer;
Message msg;
- byte[] payload;
- List<byte[]> payloads;
+ Object payload;
+ List<Object> payloads;
boolean cacheMsg = false;
boolean randomMsgSize = false;
boolean durable = false;
Random random;
int msgSizeRange = 1024;
-
+
+ enum MessageType {
+ BYTE, TEXT, MAP, OBJECT;
+
+ public static MessageType getType(String s) throws Exception
+ {
+ if ("text".equalsIgnoreCase(s))
+ {
+ return TEXT;
+ }
+ else if ("byte".equalsIgnoreCase(s))
+ {
+ return BYTE;
+ }
+ /*else if ("map".equalsIgnoreCase(s))
+ {
+ return MAP;
+ }
+ else if ("object".equalsIgnoreCase(s))
+ {
+ return OBJECT;
+ }*/
+ else
+ {
+ throw new Exception("Unsupported message type");
+ }
+ }
+ };
+
+ MessageType msgType = MessageType.BYTE;
+
public PerfProducer()
{
super();
@@ -75,14 +105,16 @@ public class PerfProducer extends PerfBase
feedbackDest = session.createTemporaryQueue();
durable = params.isDurable();
-
+ msgType = MessageType.getType(params.getMessageType());
+
+ System.out.println("Using " + msgType + " messages");
+
// if message caching is enabled we pre create the message
// else we pre create the payload
if (params.isCacheMessage())
{
cacheMsg = true;
-
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg = createMessage(createPayload(params.getMsgSize()));
msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
@@ -93,16 +125,16 @@ public class PerfProducer extends PerfBase
random = new Random(20080921);
randomMsgSize = true;
msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<byte[]>(msgSizeRange);
-
+ payloads = new ArrayList<Object>(msgSizeRange);
+
for (int i=0; i < msgSizeRange; i++)
{
- payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+ payloads.add(createPayload(i));
}
- }
+ }
else
{
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ payload = createPayload(params.getMsgSize());
}
producer = session.createProducer(dest);
@@ -110,6 +142,33 @@ public class PerfProducer extends PerfBase
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
}
+ Object createPayload(int size)
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return MessageFactory.createMessagePayload(size);
+ }
+ else
+ {
+ return MessageFactory.createMessagePayload(size).getBytes();
+ }
+ }
+
+ Message createMessage(Object payload) throws Exception
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return session.createTextMessage((String)payload);
+ }
+ else
+ {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes((byte[])payload);
+ return m;
+ }
+ }
+
+
protected Message getNextMessage() throws Exception
{
if (cacheMsg)
@@ -117,22 +176,22 @@ public class PerfProducer extends PerfBase
return msg;
}
else
- {
- msg = session.createBytesMessage();
-
+ {
+ Message m;
+
if (!randomMsgSize)
{
- ((BytesMessage)msg).writeBytes(payload);
+ m = createMessage(payload);
}
else
{
- ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+ m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
}
- msg.setJMSDeliveryMode(durable?
+ m.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
);
- return msg;
+ return m;
}
}
@@ -247,16 +306,16 @@ public class PerfProducer extends PerfBase
prod.test();
}
};
-
+
Thread t;
try
{
- t = Threading.getThreadFactory().createThread(r);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating producer thread",e);
}
- t.start();
+ t.start();
}
} \ No newline at end of file