diff options
author | Keith Wall <kwall@apache.org> | 2012-02-07 11:30:56 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-02-07 11:30:56 +0000 |
commit | a62ee47c4a7b03c05b1a534a7c189c76aa3ded04 (patch) | |
tree | fd4999f79257df73e575e55041adfcc1a6ee9b71 | |
parent | 666303157da36f5c4226db8db15ed5035533192c (diff) | |
download | qpid-python-a62ee47c4a7b03c05b1a534a7c189c76aa3ded04.tar.gz |
QPID-3807: Improve thread safety of JMS Session dispatcher.
Make _dispatcherThread/_dispatcher volatile and remove their unused setters.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1241430 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 32 |
1 files changed, 11 insertions, 21 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 94df8b2ac4..f579dcbf45 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -202,9 +202,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); - private Dispatcher _dispatcher; + private volatile Dispatcher _dispatcher; - private Thread _dispatcherThread; + private volatile Thread _dispatcherThread; private MessageFactoryRegistry _messageFactoryRegistry; @@ -309,21 +309,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _dispatcher; } - protected void setDispatcher(Dispatcher dispatcher) - { - _dispatcher = dispatcher; - } - protected Thread getDispatcherThread() { return _dispatcherThread; } - protected void setDispatcherThread(Thread dispatcherThread) - { - _dispatcherThread = dispatcherThread; - } - /** Holds the message factory factory for this session. */ protected MessageFactoryRegistry getMessageFactoryRegistry() { @@ -820,7 +810,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (e instanceof AMQDisconnectedException) { - if (_dispatcher != null) + if (_dispatcherThread != null) { // Failover failed and ain't coming back. Knife the dispatcher. _dispatcherThread.interrupt(); @@ -2326,7 +2316,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ void start() throws AMQException { - // Check if the session has perviously been started and suspended, in which case it must be unsuspended. + // Check if the session has previously been started and suspended, in which case it must be unsuspended. if (_startedAtLeastOnce.getAndSet(true)) { suspendChannel(false); @@ -2360,7 +2350,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - _logger.info("Unsuspending channel threw an exception:" + e); + _logger.info("Unsuspending channel threw an exception:", e); } } } @@ -2952,7 +2942,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - _logger.info("Suspending channel threw an exception:" + e); + _logger.info("Suspending channel threw an exception:", e); } } } @@ -3185,7 +3175,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final AtomicBoolean _closed = new AtomicBoolean(false); private final Object _lock = new Object(); - private String dispatcherID = "" + System.identityHashCode(this); + private final String dispatcherID = "" + System.identityHashCode(this); public Dispatcher() { @@ -3317,7 +3307,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // ignore + Thread.currentThread().interrupt(); } } } @@ -3332,7 +3322,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // ignore + // ignored as run will exit immediately } if (_dispatcherLogger.isInfoEnabled()) @@ -3383,7 +3373,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // pass + Thread.currentThread().interrupt(); } if (!(message instanceof CloseConsumerMessage) @@ -3501,7 +3491,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: " + e); + _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e); if (_logger.isDebugEnabled()) { _logger.debug("Is the _queue empty?" + _queue.isEmpty()); |