summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-11-21 16:32:53 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-11-21 16:32:53 +0000
commitfbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68 (patch)
treee6b60960e326e941970b04ff45b00abb59d74986
parentdd41ba8fdfecb5e60e41b714945a7ef852d7b876 (diff)
downloadqpid-python-fbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68.tar.gz
This is related to QPID-1479.
For starters I have changed the IoSender.java IoReceiver.java and AMQSession.java#Dispatcher to use the Thread factory to create the threads they require. The ThreadFactory has two implimentations, the default being the java.lang.Threads. The other is the RealtimeThreadFactory which uses reflection to create threads with a specific priority. -Dqpid.thread_factory=<thread_factory_class> will decide which thread factory should be loaded. -Dqpid.rt_thread_priority=<int> specifies the gloabl real time thread priority and defaults to 20. You could also set individual thread priorities by adding the nessacery config+code changes. I have also changed the Testkit and QpidBench to use the Thread factory so you could use them for testing/benchmarking work on RT JVMs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719628 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java55
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java47
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java27
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java38
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java23
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java22
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java17
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java14
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java20
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java23
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java22
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java65
16 files changed, 379 insertions, 71 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b5d12d9520..af0ed3faa3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -67,16 +67,26 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.CloseConsumerMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.JMSObjectMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,6 +281,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
+
+ protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry _messageFactoryRegistry;
@@ -668,7 +680,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcher != null)
{
// Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcher.interrupt();
+ _dispatcherThread.interrupt();
}
}
@@ -1852,7 +1864,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void startDispatcherIfNecessary()
{
//If we are the dispatcher then we don't need to check we are started
- if (Thread.currentThread() == _dispatcher)
+ if (Thread.currentThread() == _dispatcherThread)
{
return;
}
@@ -1883,9 +1895,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
- _dispatcher.setDaemon(true);
+ try
+ {
+ _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating Dispatcher thread",e);
+ }
+ _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
+ _dispatcherThread.setDaemon(true);
_dispatcher.setConnectionStopped(initiallyStopped);
- _dispatcher.start();
+ _dispatcherThread.start();
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(_dispatcherThread.getName() + " created");
+ }
}
else
{
@@ -2606,7 +2632,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
- class Dispatcher extends Thread
+ class Dispatcher implements Runnable
{
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
@@ -2615,21 +2641,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final Object _lock = new Object();
private String dispatcherID = "" + System.identityHashCode(this);
-
-
public Dispatcher()
{
- super("Dispatcher-Channel-" + _channelId);
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " created");
- }
}
public void close()
{
_closed.set(true);
- interrupt();
+ _dispatcherThread.interrupt();
// fixme awaitTermination
@@ -2708,7 +2727,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(getName() + " started");
+ _dispatcherLogger.info(_dispatcherThread.getName() + " started");
}
UnprocessedMessage message;
@@ -2771,7 +2790,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
+ _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
new file mode 100644
index 0000000000..94869ab205
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
@@ -0,0 +1,18 @@
+package org.apache.qpid.thread;
+
+public class DefaultThreadFactory implements ThreadFactory
+{
+
+ public Thread createThread(Runnable r)
+ {
+ return new Thread(r);
+ }
+
+ public Thread createThread(Runnable r, int priority)
+ {
+ Thread t = new Thread(r);
+ t.setPriority(priority);
+ return t;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
new file mode 100644
index 0000000000..b711f749f8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
@@ -0,0 +1,47 @@
+package org.apache.qpid.thread;
+
+import java.lang.reflect.Constructor;
+
+public class RealtimeThreadFactory implements ThreadFactory
+{
+ private Class threadClass;
+ private Constructor threadConstructor;
+ private Constructor priorityParameterConstructor;
+ private int defaultRTThreadPriority = 20;
+
+ public RealtimeThreadFactory() throws Exception
+ {
+ defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20);
+ threadClass = Class.forName("javax.realtime.RealtimeThread");
+
+ Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
+ Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
+ Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
+ Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
+ Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
+
+ Class[] paramTypes = new Class[]{schedulingParametersClass,
+ releaseParametersClass,
+ memoryParametersClass,
+ memoryAreaClass,
+ processingGroupParametersClass,
+ java.lang.Runnable.class};
+
+ threadConstructor = threadClass.getConstructor(paramTypes);
+
+ Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
+ priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class});
+ }
+
+ public Thread createThread(Runnable r) throws Exception
+ {
+ return createThread(r,defaultRTThreadPriority);
+ }
+
+ public Thread createThread(Runnable r, int priority) throws Exception
+ {
+ Object priorityParams = priorityParameterConstructor.newInstance(priority);
+ return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
new file mode 100644
index 0000000000..f9bcabfa3d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
@@ -0,0 +1,7 @@
+package org.apache.qpid.thread;
+
+public interface ThreadFactory
+{
+ public Thread createThread(Runnable r) throws Exception;
+ public Thread createThread(Runnable r, int priority) throws Exception;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
new file mode 100644
index 0000000000..0fb113d22c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
@@ -0,0 +1,26 @@
+package org.apache.qpid.thread;
+
+public final class Threading
+{
+ private static ThreadFactory threadFactory;
+
+ static {
+ try
+ {
+ Class threadFactoryClass =
+ Class.forName(System.getProperty("qpid.thread_factory",
+ "org.apache.qpid.thread.DefaultThreadFactory"));
+
+ threadFactory = (ThreadFactory)threadFactoryClass.newInstance();
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error occured while loading thread factory",e);
+ }
+ }
+
+ public static ThreadFactory getThreadFactory()
+ {
+ return threadFactory;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 5efd51d5db..b245e47336 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -35,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
-final class IoReceiver extends Thread
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
@@ -46,6 +47,7 @@ final class IoReceiver extends Thread
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Thread receiverThread;
public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
int bufferSize, long timeout)
@@ -55,10 +57,18 @@ final class IoReceiver extends Thread
this.bufferSize = bufferSize;
this.socket = transport.getSocket();
this.timeout = timeout;
-
- setDaemon(true);
- setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
- start();
+
+ try
+ {
+ receiverThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOReceiver thread",e);
+ }
+ receiverThread.setDaemon(true);
+ receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ receiverThread.start();
}
void close(boolean block)
@@ -75,10 +85,10 @@ final class IoReceiver extends Thread
{
socket.shutdownInput();
}
- if (block && Thread.currentThread() != this)
+ if (block && Thread.currentThread() != receiverThread)
{
- join(timeout);
- if (isAlive())
+ receiverThread.join(timeout);
+ if (receiverThread.isAlive())
{
throw new TransportException("join timed out");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 36ea14856a..29f0c766fc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -32,7 +33,7 @@ import org.apache.qpid.transport.util.Logger;
import static org.apache.qpid.transport.util.Functions.*;
-public final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender implements Runnable, Sender<ByteBuffer>
{
private static final Logger log = Logger.get(IoSender.class);
@@ -54,7 +55,8 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
private final Object notFull = new Object();
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
-
+ private final Thread senderThread;
+
private volatile Throwable exception = null;
@@ -74,9 +76,18 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
throw new TransportException("Error getting output stream for socket", e);
}
- setDaemon(true);
- setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
- start();
+ try
+ {
+ senderThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOSender thread",e);
+ }
+
+ senderThread.setDaemon(true);
+ senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ senderThread.start();
}
private static final int pof2(int n)
@@ -188,10 +199,10 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
try
{
- if (Thread.currentThread() != this)
+ if (Thread.currentThread() != senderThread)
{
- join(timeout);
- if (isAlive())
+ senderThread.join(timeout);
+ if (senderThread.isAlive())
{
throw new SenderException("join timed out");
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
index 35a2374fbc..4a4f3d124b 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
@@ -37,6 +37,7 @@ import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.qpid.testkit.MessageFactory;
+import org.apache.qpid.thread.Threading;
/**
* Latency test sends an x number of messages in warmup mode and wait for a confirmation
@@ -314,19 +315,36 @@ public class LatencyTest extends PerfBase implements MessageListener
public static void main(String[] args)
{
- LatencyTest latencyTest = new LatencyTest();
- latencyTest.test();
- latencyTest.printToConsole();
- if (System.getProperty("file") != null)
+ final LatencyTest latencyTest = new LatencyTest();
+ Runnable r = new Runnable()
{
- try
+ public void run()
{
- latencyTest.writeToFile();
- }
- catch(Exception e)
- {
- e.printStackTrace();
+ latencyTest.test();
+ latencyTest.printToConsole();
+ if (System.getProperty("file") != null)
+ {
+ try
+ {
+ latencyTest.writeToFile();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
}
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating latency test thread",e);
}
+ t.start();
}
} \ No newline at end of file
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
index cd12c7010d..9781a7e839 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
@@ -27,6 +27,8 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* PerfConsumer will receive x no of messages in warmup mode.
* Once it receives the Start message it will then signal the PerfProducer.
@@ -242,7 +244,24 @@ public class PerfConsumer extends PerfBase implements MessageListener
public static void main(String[] args)
{
- PerfConsumer cons = new PerfConsumer();
- cons.test();
+ final PerfConsumer cons = new PerfConsumer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
}
} \ No newline at end of file
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
index 757b1bfcda..e9421d7f22 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
@@ -27,6 +27,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import org.apache.qpid.testkit.MessageFactory;
+import org.apache.qpid.thread.Threading;
/**
* PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
@@ -201,7 +202,24 @@ public class PerfProducer extends PerfBase
public static void main(String[] args)
{
- PerfProducer prod = new PerfProducer();
- prod.test();
+ final PerfProducer prod = new PerfProducer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
}
} \ No newline at end of file
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
index a91d9e7e85..d5514873e6 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
@@ -29,6 +29,8 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* Test Description
* ================
@@ -67,7 +69,7 @@ public class MultiThreadedConsumer extends BaseTest
{
final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -131,7 +133,18 @@ public class MultiThreadedConsumer extends BaseTest
}
- });
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+
t.setName("session-" + i);
t.start();
} // for loop
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
index 279e5ea0bf..1cf4ee28ca 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
@@ -32,6 +32,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -79,7 +80,7 @@ public class MultiThreadedProducer extends SimpleProducer
for (int i = 0; i < session_count; i++)
{
final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
+ Runnable r = new Runnable()
{
private Random gen = new Random();
@@ -142,7 +143,16 @@ public class MultiThreadedProducer extends SimpleProducer
}
- });
+ };
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
t.setName("session-" + i);
t.start();
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
index c33f9ffbf2..1ae2c35970 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
@@ -30,6 +30,7 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -131,8 +132,23 @@ public class ResourceLeakTest extends BaseTest
public static void main(String[] args)
{
- ResourceLeakTest test = new ResourceLeakTest();
- test.test();
+ final ResourceLeakTest test = new ResourceLeakTest();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating test thread",e);
+ }
}
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
index b3eb97dafe..cd6d9013f8 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
@@ -29,6 +29,8 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* Test Description
* ================
@@ -126,9 +128,24 @@ public class SimpleConsumer extends BaseTest
public static void main(String[] args)
{
- SimpleConsumer test = new SimpleConsumer();
- test.setUp();
- test.test();
+ final SimpleConsumer test = new SimpleConsumer();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.setUp();
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
}
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
index 1080092536..805ce7ac29 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
@@ -33,6 +33,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -138,9 +139,24 @@ public class SimpleProducer extends BaseTest
public static void main(String[] args)
{
- SimpleProducer test = new SimpleProducer();
- test.setUp();
- test.test();
+ final SimpleProducer test = new SimpleProducer();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.setUp();
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
}
}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
index 7411e81bd6..4bba7b113d 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
@@ -20,23 +20,45 @@
*/
package org.apache.qpid.tools;
-import java.lang.reflect.InvocationTargetException;
+import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
+import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
+import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
+
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
-import javax.jms.*;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeBind;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageSubscribe;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.QueueDeclare;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
-import static org.apache.qpid.tools.QpidBench.Mode.*;
-
/**
* QpidBench
*
@@ -412,7 +434,7 @@ public class QpidBench
{
case CONSUME:
case BOTH:
- new Thread()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -432,7 +454,18 @@ public class QpidBench
throw new RuntimeException(e);
}
}
- }.start();
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
break;
}
@@ -440,7 +473,7 @@ public class QpidBench
{
case PUBLISH:
case BOTH:
- new Thread()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -460,7 +493,17 @@ public class QpidBench
throw new RuntimeException(e);
}
}
- }.start();
+ };
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating publisher thread",e);
+ }
+ t.start();
break;
}
}