summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-06-28 19:22:27 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-06-28 19:22:27 +0000
commit531d7bb278d19e9c20a7c2ae84a02581830de78b (patch)
tree75ea0488e4de2f0867e002da2bbecf455cc4cb87
parentc0635d700dfd3f3a032adbc4c84e049bd5b47722 (diff)
downloadqpid-python-531d7bb278d19e9c20a7c2ae84a02581830de78b.tar.gz
NO-JIRA Added the ability to specify a message type. Currently on bytes
and text messages are supported. Hoping to add map and object message support in the near future. You could use -Dmsg_type=TEXT|BYTES to specify the type. The default is BYTES. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1140793 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java101
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java13
2 files changed, 90 insertions, 24 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index 015d1e6205..a4b2982275 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -56,14 +56,44 @@ public class PerfProducer extends PerfBase
{
MessageProducer producer;
Message msg;
- byte[] payload;
- List<byte[]> payloads;
+ Object payload;
+ List<Object> payloads;
boolean cacheMsg = false;
boolean randomMsgSize = false;
boolean durable = false;
Random random;
int msgSizeRange = 1024;
-
+
+ enum MessageType {
+ BYTE, TEXT, MAP, OBJECT;
+
+ public static MessageType getType(String s) throws Exception
+ {
+ if ("text".equalsIgnoreCase(s))
+ {
+ return TEXT;
+ }
+ else if ("byte".equalsIgnoreCase(s))
+ {
+ return BYTE;
+ }
+ /*else if ("map".equalsIgnoreCase(s))
+ {
+ return MAP;
+ }
+ else if ("object".equalsIgnoreCase(s))
+ {
+ return OBJECT;
+ }*/
+ else
+ {
+ throw new Exception("Unsupported message type");
+ }
+ }
+ };
+
+ MessageType msgType = MessageType.BYTE;
+
public PerfProducer()
{
super();
@@ -75,14 +105,16 @@ public class PerfProducer extends PerfBase
feedbackDest = session.createTemporaryQueue();
durable = params.isDurable();
-
+ msgType = MessageType.getType(params.getMessageType());
+
+ System.out.println("Using " + msgType + " messages");
+
// if message caching is enabled we pre create the message
// else we pre create the payload
if (params.isCacheMessage())
{
cacheMsg = true;
-
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg = createMessage(createPayload(params.getMsgSize()));
msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
@@ -93,16 +125,16 @@ public class PerfProducer extends PerfBase
random = new Random(20080921);
randomMsgSize = true;
msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<byte[]>(msgSizeRange);
-
+ payloads = new ArrayList<Object>(msgSizeRange);
+
for (int i=0; i < msgSizeRange; i++)
{
- payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+ payloads.add(createPayload(i));
}
- }
+ }
else
{
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ payload = createPayload(params.getMsgSize());
}
producer = session.createProducer(dest);
@@ -110,6 +142,33 @@ public class PerfProducer extends PerfBase
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
}
+ Object createPayload(int size)
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return MessageFactory.createMessagePayload(size);
+ }
+ else
+ {
+ return MessageFactory.createMessagePayload(size).getBytes();
+ }
+ }
+
+ Message createMessage(Object payload) throws Exception
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return session.createTextMessage((String)payload);
+ }
+ else
+ {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes((byte[])payload);
+ return m;
+ }
+ }
+
+
protected Message getNextMessage() throws Exception
{
if (cacheMsg)
@@ -117,22 +176,22 @@ public class PerfProducer extends PerfBase
return msg;
}
else
- {
- msg = session.createBytesMessage();
-
+ {
+ Message m;
+
if (!randomMsgSize)
{
- ((BytesMessage)msg).writeBytes(payload);
+ m = createMessage(payload);
}
else
{
- ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+ m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
}
- msg.setJMSDeliveryMode(durable?
+ m.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
);
- return msg;
+ return m;
}
}
@@ -247,16 +306,16 @@ public class PerfProducer extends PerfBase
prod.test();
}
};
-
+
Thread t;
try
{
- t = Threading.getThreadFactory().createThread(r);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating producer thread",e);
}
- t.start();
+ t.start();
}
} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index 5af53e4a70..70e94a2346 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -62,12 +62,14 @@ public class TestParams
private int msg_count = 10;
private int warmup_count = 1;
-
+
private boolean random_msg_size = false;
+ private String msgType = "byte";
+
public TestParams()
{
-
+
url = System.getProperty("url",url);
host = System.getProperty("host","");
port = Integer.getInteger("port", -1);
@@ -85,6 +87,7 @@ public class TestParams
msg_count = Integer.getInteger("msg_count",msg_count);
warmup_count = Integer.getInteger("warmup_count",warmup_count);
random_msg_size = Boolean.getBoolean("random_msg_size");
+ msgType = System.getProperty("msg_type","byte");
}
public String getUrl()
@@ -161,10 +164,14 @@ public class TestParams
{
return disableTimestamp;
}
-
+
public boolean isRandomMsgSize()
{
return random_msg_size;
}
+ public String getMessageType()
+ {
+ return msgType;
+ }
}