summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-01 16:01:14 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-01 16:01:14 +0000
commit54f74a1c0c69250a14bdcf8d30fb54faa69cf120 (patch)
treec5abea0e4b2ee4a19aee6468d057363a3cf185a1 /java/client/src/main
parentf7fdf57e4fd816779115a5b92deb2925eb17caf3 (diff)
downloadqpid-python-54f74a1c0c69250a14bdcf8d30fb54faa69cf120.tar.gz
QPID-339 Java client hangs when starting up (intermittently)
Patched the problem where the dispatcher would hang. The previous logic was flawed. Patch worked on by Robert Godfrey and Martin Ritchie. Added test to ensure that the connection is not automatically started. (Only added the test last time by mistake. This is the actual fix) With a test for the DispatcherTest git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@502253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java155
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
2 files changed, 95 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b6429972df..c436121855 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
- private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
-
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private long _nextProducerId;
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
- private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
/**
* Set when recover is called. This is to handle the case where recover() is called by application code
@@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private boolean _inRecovery;
+ private boolean _connectionStopped;
+
private boolean _hasMessageListeners;
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
-
+
private class Dispatcher extends Thread
{
+
+ /**
+ * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
+ */
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private final Object _lock = new Object();
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
UnprocessedMessage message;
- _stopped.set(false);
+
try
{
- while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
- dispatchMessage(message);
+ synchronized (_lock)
+ {
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ dispatchMessage(message);
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ }
+
}
}
catch (InterruptedException e)
@@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.info("Dispatcher thread terminating for channel " + _channelId);
}
+ // only call while holding lock
+ final boolean connectionStopped()
+ {
+ return _connectionStopped;
+ }
+
+ void setConnectionStopped(boolean connectionStopped)
+ {
+ synchronized (_lock)
+ {
+ _connectionStopped = connectionStopped;
+ _lock.notify();
+ }
+ }
+
private void dispatchMessage(UnprocessedMessage message)
{
if (message.getDeliverBody() != null)
@@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public void stopDispatcher()
+ public void close()
{
- _stopped.set(true);
+ _closed.set(true);
interrupt();
+
+ //fixme awaitTermination
+
}
}
-
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
{
@@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
- _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
-
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
-
+
public void rollback() throws JMSException
{
checkTransacted();
@@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// TODO: Be aware of possible changes to parameter order as versions change.
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false)); // requeue
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageListener getMessageListener() throws JMSException
{
- checkNotClosed();
+// checkNotClosed();
return _messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
- checkNotClosed();
-
- if (!isStopped())
- {
- throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
- }
-
- // We are stopped
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- BasicMessageConsumer consumer = i.next();
-
- if (consumer.isReceiving())
- {
- throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
- }
- }
-
- _messageListener = listener;
-
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- i.next().setMessageListener(_messageListener);
- }
-
+// checkNotClosed();
+//
+// if (_dispatcher != null && !_dispatcher.connectionStopped())
+// {
+// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+// }
+//
+// // We are stopped
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// BasicMessageConsumer consumer = i.next();
+//
+// if (consumer.isReceiving())
+// {
+// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+// }
+// }
+//
+// _messageListener = listener;
+//
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// i.next().setMessageListener(_messageListener);
+// }
}
@@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void start()
{
+ //fixme This should be controlled by _stopped as it pairs with the stop method
+ //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
+ //will result in sending Flow messages for each subsequent call to flow.. only need to do this
+ // if we have called stop.
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
}
- if(hasMessageListeners() && _dispatcher == null)
+ if (hasMessageListeners())
{
startDistpatcherIfNecessary();
}
@@ -1606,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
- if(_dispatcher == null)
+ if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
@@ -1618,14 +1648,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
//stop the server delivering messages to this session
suspendChannel();
-
- //stop the dispatcher thread
- _stopped.set(true);
- }
-
- boolean isStopped()
- {
- return _stopped.get();
+ _dispatcher.setConnectionStopped(true);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4ffec6fb41..832c312634 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
//i.e. it is only valid to call this method if
//
- // (a) the session is stopped, in which case the dispatcher is not running
+ // (a) the connection is stopped, in which case the dispatcher is not running
// OR
// (b) the listener is null AND we are not receiving synchronously at present
//
- if (_session.isStopped())
+ if (!_session.getAMQConnection().started())
{
_messageListener.set(messageListener);
_session.setHasMessageListeners();
- _session.startDistpatcherIfNecessary();
if (_logger.isDebugEnabled())
{
@@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
synchronized (_session)
{
-
_messageListener.set(messageListener);
_session.setHasMessageListeners();
_session.startDistpatcherIfNecessary();
@@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
- _session.startDistpatcherIfNecessary();
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
- _session.startDistpatcherIfNecessary();
-
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- //This shouldn't be possible.
_synchronousQueue.put(jmsMessage);
}
}