diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-11-21 17:57:16 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-11-21 17:57:16 +0000 |
commit | 8417094a3f0c28fef298d57db5616854458b7a8b (patch) | |
tree | dbbae1591cbc7db6a0d1d9700a43f0ab1d535b04 | |
parent | fbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68 (diff) | |
download | qpid-python-8417094a3f0c28fef298d57db5616854458b7a8b.tar.gz |
Appologies for the sudden checkin without notice, close to the release cycle.
Reverting the changes back. Will attach a patch and commit after the release.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719657 13f79535-47bb-0310-9956-ffa450edef68
16 files changed, 71 insertions, 379 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index af0ed3faa3..b5d12d9520 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -67,26 +67,16 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.AMQMessageDelegateFactory; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.CloseConsumerMessage; -import org.apache.qpid.client.message.JMSBytesMessage; -import org.apache.qpid.client.message.JMSMapMessage; -import org.apache.qpid.client.message.JMSObjectMessage; -import org.apache.qpid.client.message.JMSStreamMessage; -import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; -import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -281,8 +271,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; - - protected Thread _dispatcherThread; /** Holds the message factory factory for this session. */ protected MessageFactoryRegistry _messageFactoryRegistry; @@ -680,7 +668,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcher != null) { // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcherThread.interrupt(); + _dispatcher.interrupt(); } } @@ -1864,7 +1852,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void startDispatcherIfNecessary() { //If we are the dispatcher then we don't need to check we are started - if (Thread.currentThread() == _dispatcherThread) + if (Thread.currentThread() == _dispatcher) { return; } @@ -1895,23 +1883,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcher == null) { _dispatcher = new Dispatcher(); - try - { - _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); - - } - catch(Exception e) - { - throw new Error("Error creating Dispatcher thread",e); - } - _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); - _dispatcherThread.setDaemon(true); + _dispatcher.setDaemon(true); _dispatcher.setConnectionStopped(initiallyStopped); - _dispatcherThread.start(); - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(_dispatcherThread.getName() + " created"); - } + _dispatcher.start(); } else { @@ -2632,7 +2606,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - class Dispatcher implements Runnable + class Dispatcher extends Thread { /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ @@ -2641,14 +2615,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final Object _lock = new Object(); private String dispatcherID = "" + System.identityHashCode(this); + + public Dispatcher() { + super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } } public void close() { _closed.set(true); - _dispatcherThread.interrupt(); + interrupt(); // fixme awaitTermination @@ -2727,7 +2708,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " started"); + _dispatcherLogger.info(getName() + " started"); } UnprocessedMessage message; @@ -2790,7 +2771,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId); + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java deleted file mode 100644 index 94869ab205..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.qpid.thread; - -public class DefaultThreadFactory implements ThreadFactory -{ - - public Thread createThread(Runnable r) - { - return new Thread(r); - } - - public Thread createThread(Runnable r, int priority) - { - Thread t = new Thread(r); - t.setPriority(priority); - return t; - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java deleted file mode 100644 index b711f749f8..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.apache.qpid.thread; - -import java.lang.reflect.Constructor; - -public class RealtimeThreadFactory implements ThreadFactory -{ - private Class threadClass; - private Constructor threadConstructor; - private Constructor priorityParameterConstructor; - private int defaultRTThreadPriority = 20; - - public RealtimeThreadFactory() throws Exception - { - defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20); - threadClass = Class.forName("javax.realtime.RealtimeThread"); - - Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters"); - Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters"); - Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters"); - Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea"); - Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters"); - - Class[] paramTypes = new Class[]{schedulingParametersClass, - releaseParametersClass, - memoryParametersClass, - memoryAreaClass, - processingGroupParametersClass, - java.lang.Runnable.class}; - - threadConstructor = threadClass.getConstructor(paramTypes); - - Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters"); - priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class}); - } - - public Thread createThread(Runnable r) throws Exception - { - return createThread(r,defaultRTThreadPriority); - } - - public Thread createThread(Runnable r, int priority) throws Exception - { - Object priorityParams = priorityParameterConstructor.newInstance(priority); - return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java deleted file mode 100644 index f9bcabfa3d..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.qpid.thread; - -public interface ThreadFactory -{ - public Thread createThread(Runnable r) throws Exception; - public Thread createThread(Runnable r, int priority) throws Exception; -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java deleted file mode 100644 index 0fb113d22c..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.qpid.thread; - -public final class Threading -{ - private static ThreadFactory threadFactory; - - static { - try - { - Class threadFactoryClass = - Class.forName(System.getProperty("qpid.thread_factory", - "org.apache.qpid.thread.DefaultThreadFactory")); - - threadFactory = (ThreadFactory)threadFactoryClass.newInstance(); - } - catch(Exception e) - { - throw new Error("Error occured while loading thread factory",e); - } - } - - public static ThreadFactory getThreadFactory() - { - return threadFactory; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index b245e47336..5efd51d5db 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -36,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ -final class IoReceiver implements Runnable +final class IoReceiver extends Thread { private static final Logger log = Logger.get(IoReceiver.class); @@ -47,7 +46,6 @@ final class IoReceiver implements Runnable private final Socket socket; private final long timeout; private final AtomicBoolean closed = new AtomicBoolean(false); - private final Thread receiverThread; public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) @@ -57,18 +55,10 @@ final class IoReceiver implements Runnable this.bufferSize = bufferSize; this.socket = transport.getSocket(); this.timeout = timeout; - - try - { - receiverThread = Threading.getThreadFactory().createThread(this); - } - catch(Exception e) - { - throw new Error("Error creating IOReceiver thread",e); - } - receiverThread.setDaemon(true); - receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); - receiverThread.start(); + + setDaemon(true); + setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + start(); } void close(boolean block) @@ -85,10 +75,10 @@ final class IoReceiver implements Runnable { socket.shutdownInput(); } - if (block && Thread.currentThread() != receiverThread) + if (block && Thread.currentThread() != this) { - receiverThread.join(timeout); - if (receiverThread.isAlive()) + join(timeout); + if (isAlive()) { throw new TransportException("join timed out"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 29f0c766fc..36ea14856a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,7 +24,6 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -33,7 +32,7 @@ import org.apache.qpid.transport.util.Logger; import static org.apache.qpid.transport.util.Functions.*; -public final class IoSender implements Runnable, Sender<ByteBuffer> +public final class IoSender extends Thread implements Sender<ByteBuffer> { private static final Logger log = Logger.get(IoSender.class); @@ -55,8 +54,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notFull = new Object(); private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - private final Thread senderThread; - + private volatile Throwable exception = null; @@ -76,18 +74,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> throw new TransportException("Error getting output stream for socket", e); } - try - { - senderThread = Threading.getThreadFactory().createThread(this); - } - catch(Exception e) - { - throw new Error("Error creating IOSender thread",e); - } - - senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - senderThread.start(); + setDaemon(true); + setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + start(); } private static final int pof2(int n) @@ -199,10 +188,10 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - if (Thread.currentThread() != senderThread) + if (Thread.currentThread() != this) { - senderThread.join(timeout); - if (senderThread.isAlive()) + join(timeout); + if (isAlive()) { throw new SenderException("join timed out"); } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java index 4a4f3d124b..35a2374fbc 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java @@ -37,7 +37,6 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; /** * Latency test sends an x number of messages in warmup mode and wait for a confirmation @@ -315,36 +314,19 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); - Runnable r = new Runnable() + LatencyTest latencyTest = new LatencyTest(); + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) { - public void run() + try { - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) - { - try - { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating latency test thread",e); } - t.start(); } }
\ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java index 9781a7e839..cd12c7010d 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java @@ -27,8 +27,6 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; -import org.apache.qpid.thread.Threading; - /** * PerfConsumer will receive x no of messages in warmup mode. * Once it receives the Start message it will then signal the PerfProducer. @@ -244,24 +242,7 @@ public class PerfConsumer extends PerfBase implements MessageListener public static void main(String[] args) { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() - { - public void run() - { - cons.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); + PerfConsumer cons = new PerfConsumer(); + cons.test(); } }
\ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index e9421d7f22..757b1bfcda 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -27,7 +27,6 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; /** * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation @@ -202,24 +201,7 @@ public class PerfProducer extends PerfBase public static void main(String[] args) { - final PerfProducer prod = new PerfProducer(); - Runnable r = new Runnable() - { - public void run() - { - prod.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); + PerfProducer prod = new PerfProducer(); + prod.test(); } }
\ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index d5514873e6..a91d9e7e85 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -29,8 +29,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.thread.Threading; - /** * Test Description * ================ @@ -69,7 +67,7 @@ public class MultiThreadedConsumer extends BaseTest { final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() + Thread t = new Thread(new Runnable() { public void run() { @@ -133,18 +131,7 @@ public class MultiThreadedConsumer extends BaseTest } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - + }); t.setName("session-" + i); t.start(); } // for loop diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java index 1cf4ee28ca..279e5ea0bf 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -32,7 +32,6 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; /** * Test Description @@ -80,7 +79,7 @@ public class MultiThreadedProducer extends SimpleProducer for (int i = 0; i < session_count; i++) { final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() + Thread t = new Thread(new Runnable() { private Random gen = new Random(); @@ -143,16 +142,7 @@ public class MultiThreadedProducer extends SimpleProducer } - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } + }); t.setName("session-" + i); t.start(); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index 1ae2c35970..c33f9ffbf2 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -30,7 +30,6 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; /** * Test Description @@ -132,23 +131,8 @@ public class ResourceLeakTest extends BaseTest public static void main(String[] args) { - final ResourceLeakTest test = new ResourceLeakTest(); - Runnable r = new Runnable(){ - public void run() - { - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating test thread",e); - } + ResourceLeakTest test = new ResourceLeakTest(); + test.test(); } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index cd6d9013f8..b3eb97dafe 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -29,8 +29,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.thread.Threading; - /** * Test Description * ================ @@ -128,24 +126,9 @@ public class SimpleConsumer extends BaseTest public static void main(String[] args) { - final SimpleConsumer test = new SimpleConsumer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } + SimpleConsumer test = new SimpleConsumer(); + test.setUp(); + test.test(); } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java index 805ce7ac29..1080092536 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -33,7 +33,6 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; /** * Test Description @@ -139,24 +138,9 @@ public class SimpleProducer extends BaseTest public static void main(String[] args) { - final SimpleProducer test = new SimpleProducer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } + SimpleProducer test = new SimpleProducer(); + test.setUp(); + test.test(); } } diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 4bba7b113d..7411e81bd6 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -20,45 +20,23 @@ */ package org.apache.qpid.tools; -import static org.apache.qpid.tools.QpidBench.Mode.BOTH; -import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; -import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; - -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; +import java.util.UUID; +import javax.jms.*; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.ExchangeBind; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageSubscribe; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.QueueDeclare; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; +import static org.apache.qpid.tools.QpidBench.Mode.*; + /** * QpidBench * @@ -434,7 +412,7 @@ public class QpidBench { case CONSUME: case BOTH: - Runnable r = new Runnable() + new Thread() { public void run() { @@ -454,18 +432,7 @@ public class QpidBench throw new RuntimeException(e); } } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); + }.start(); break; } @@ -473,7 +440,7 @@ public class QpidBench { case PUBLISH: case BOTH: - Runnable r = new Runnable() + new Thread() { public void run() { @@ -493,17 +460,7 @@ public class QpidBench throw new RuntimeException(e); } } - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating publisher thread",e); - } - t.start(); + }.start(); break; } } |