summaryrefslogtreecommitdiff
path: root/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java')
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java96
1 files changed, 69 insertions, 27 deletions
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
index c33f9ffbf2..c240ecdf2e 100644
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.testkit.soak;
+import java.util.Random;
+
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
@@ -29,15 +31,21 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.testkit.TestLauncher;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
* ================
* This test will open x number of connections where each
* connection will create a session and a producer/consumer pair,
- * and then send configurable no of messages.
- * It will them sleep for configurable time interval and
+ * and then a randomly selected set of connections (about 1/4th)
+ * will send a configurable no of messages and try to receive them.
+ * It will then sleep for configurable time interval and
* tear down the connections/sessions/consumers.
* It will then repeat the process again until the test is stopped.
*
@@ -46,16 +54,14 @@ import org.apache.qpid.framing.AMQShortString;
* To find if the broker has leaks when cleaning resources.
* To find if the client has leaks with resources.
*/
-public class ResourceLeakTest extends BaseTest
+public class ResourceLeakTest extends TestLauncher
{
- protected int connection_count = 10;
- protected long connection_idle_time = 5000;
-
+ /* protected long connection_idle_time = 5000;
+ protected Random rand = new Random();
+
public ResourceLeakTest()
{
- super();
- connection_count = Integer.getInteger("con_count",10);
- connection_idle_time = Long.getLong("con_idle_time", 5000);
+ super();
}
public void test()
@@ -67,13 +73,7 @@ public class ResourceLeakTest extends BaseTest
Session[] sessions = new Session[connection_count];
MessageConsumer[] msgCons = new MessageConsumer[connection_count];
MessageProducer [] msgProds = new MessageProducer[connection_count];
- Destination dest = new AMQQueue(new AMQShortString(exchange_name),
- new AMQShortString(routing_key),
- new AMQShortString(queue_name),
- true, //exclusive
- true // auto delete
- );
-
+
while (true)
{
for (int i = 0; i < connection_count; i++)
@@ -83,23 +83,36 @@ public class ResourceLeakTest extends BaseTest
cons[i] = con;
Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
sessions[i] = ssn;
+ Destination dest = new AMQQueue(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key + i),
+ new AMQShortString(queue_name + i),
+ true, //exclusive
+ true // auto delete
+ );
MessageConsumer msgCon = ssn.createConsumer(dest);
msgCons[i] = msgCon;
MessageProducer msgProd = ssn.createProducer(dest);
msgProds[i] = msgProd;
-
- BytesMessage msg = ssn.createBytesMessage();
+ }
+
+ // Select some connections randomly and send/recv messages
+ // Exercise around quarter of the connections
+ for (int i=0; i < connection_count/4; i++)
+ {
+ int k = rand.nextInt(connection_count);
+
+ BytesMessage msg = sessions[k].createBytesMessage();
msg.writeBytes("Test Msg".getBytes());
for (int j = 0; j < msg_count;j++)
{
- msgProd.send(msg);
+ msgProds[k].send(msg);
}
int j = 0;
while (j < msg_count)
{
- msgCon.receive();
+ msgCons[k].receive();
j++;
}
}
@@ -110,10 +123,24 @@ public class ResourceLeakTest extends BaseTest
{
for (int i = 0; i < connection_count; i++)
{
- msgCons[i].close();
- msgProds[i].close();
- sessions[i].close();
- cons[i].close();
+ if (!((BasicMessageConsumer)msgCons[i]).isClosed())
+ {
+ msgCons[i].close();
+ }
+
+ if (!((BasicMessageProducer)msgProds[i]).isClosed())
+ {
+ msgProds[i].close();
+ }
+
+ if (!((AMQSession)sessions[i]).isClosed())
+ {
+ sessions[i].close();
+ }
+ if (!((AMQConnection)cons[i]).isClosed())
+ {
+ cons[i].close();
+ }
}
}
catch (Exception e)
@@ -131,8 +158,23 @@ public class ResourceLeakTest extends BaseTest
public static void main(String[] args)
{
- ResourceLeakTest test = new ResourceLeakTest();
- test.test();
- }
+ final ResourceLeakTest test = new ResourceLeakTest();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating test thread",e);
+ }
+ }*/
}