summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-02-07 11:30:56 +0000
committerKeith Wall <kwall@apache.org>2012-02-07 11:30:56 +0000
commita62ee47c4a7b03c05b1a534a7c189c76aa3ded04 (patch)
treefd4999f79257df73e575e55041adfcc1a6ee9b71
parent666303157da36f5c4226db8db15ed5035533192c (diff)
downloadqpid-python-a62ee47c4a7b03c05b1a534a7c189c76aa3ded04.tar.gz
QPID-3807: Improve thread safety of JMS Session dispatcher.
Make _dispatcherThread/_dispatcher volatile and remove their unused setters. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1241430 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java32
1 files changed, 11 insertions, 21 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 94df8b2ac4..f579dcbf45 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -202,9 +202,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
- private Dispatcher _dispatcher;
+ private volatile Dispatcher _dispatcher;
- private Thread _dispatcherThread;
+ private volatile Thread _dispatcherThread;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -309,21 +309,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _dispatcher;
}
- protected void setDispatcher(Dispatcher dispatcher)
- {
- _dispatcher = dispatcher;
- }
-
protected Thread getDispatcherThread()
{
return _dispatcherThread;
}
- protected void setDispatcherThread(Thread dispatcherThread)
- {
- _dispatcherThread = dispatcherThread;
- }
-
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry getMessageFactoryRegistry()
{
@@ -820,7 +810,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (e instanceof AMQDisconnectedException)
{
- if (_dispatcher != null)
+ if (_dispatcherThread != null)
{
// Failover failed and ain't coming back. Knife the dispatcher.
_dispatcherThread.interrupt();
@@ -2326,7 +2316,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
void start() throws AMQException
{
- // Check if the session has perviously been started and suspended, in which case it must be unsuspended.
+ // Check if the session has previously been started and suspended, in which case it must be unsuspended.
if (_startedAtLeastOnce.getAndSet(true))
{
suspendChannel(false);
@@ -2360,7 +2350,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- _logger.info("Unsuspending channel threw an exception:" + e);
+ _logger.info("Unsuspending channel threw an exception:", e);
}
}
}
@@ -2952,7 +2942,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- _logger.info("Suspending channel threw an exception:" + e);
+ _logger.info("Suspending channel threw an exception:", e);
}
}
}
@@ -3185,7 +3175,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
- private String dispatcherID = "" + System.identityHashCode(this);
+ private final String dispatcherID = "" + System.identityHashCode(this);
public Dispatcher()
{
@@ -3317,7 +3307,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // ignore
+ Thread.currentThread().interrupt();
}
}
}
@@ -3332,7 +3322,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // ignore
+ // ignored as run will exit immediately
}
if (_dispatcherLogger.isInfoEnabled())
@@ -3383,7 +3373,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // pass
+ Thread.currentThread().interrupt();
}
if (!(message instanceof CloseConsumerMessage)
@@ -3501,7 +3491,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: " + e);
+ _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e);
if (_logger.isDebugEnabled())
{
_logger.debug("Is the _queue empty?" + _queue.isEmpty());