diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-01 16:01:14 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-01 16:01:14 +0000 |
commit | 54f74a1c0c69250a14bdcf8d30fb54faa69cf120 (patch) | |
tree | c5abea0e4b2ee4a19aee6468d057363a3cf185a1 /java | |
parent | f7fdf57e4fd816779115a5b92deb2925eb17caf3 (diff) | |
download | qpid-python-54f74a1c0c69250a14bdcf8d30fb54faa69cf120.tar.gz |
QPID-339 Java client hangs when starting up (intermittently)
Patched the problem where the dispatcher would hang. The previous logic was flawed.
Patch worked on by Robert Godfrey and Martin Ritchie.
Added test to ensure that the connection is not automatically started.
(Only added the test last time by mistake. This is the actual fix)
With a test for the DispatcherTest
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@502253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 348 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b6429972df..c436121855 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; - private final java.util.Queue<MessageConsumerPair> _reprocessQueue; - private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private long _nextProducerId; - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ - private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false); /** * Set when recover is called. This is to handle the case where recover() is called by application code @@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private boolean _inRecovery; + private boolean _connectionStopped; + private boolean _hasMessageListeners; /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - + private class Dispatcher extends Thread { + + /** + * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. + */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { UnprocessedMessage message; - _stopped.set(false); + try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) { - dispatchMessage(message); + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } + + dispatchMessage(message); + + while (connectionStopped()) + { + _lock.wait(); + } + + } + } } catch (InterruptedException e) @@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _logger.info("Dispatcher thread terminating for channel " + _channelId); } + // only call while holding lock + final boolean connectionStopped() + { + return _connectionStopped; + } + + void setConnectionStopped(boolean connectionStopped) + { + synchronized (_lock) + { + _connectionStopped = connectionStopped; + _lock.notify(); + } + } + private void dispatchMessage(UnprocessedMessage message) { if (message.getDeliverBody() != null) @@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public void stopDispatcher() + public void close() { - _stopped.set(true); + _closed.set(true); interrupt(); + + //fixme awaitTermination + } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) { @@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; - _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); - if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - + public void rollback() throws JMSException { checkTransacted(); @@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // TODO: Be aware of possible changes to parameter order as versions change. getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false)); // requeue + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageListener getMessageListener() throws JMSException { - checkNotClosed(); +// checkNotClosed(); return _messageListener; } public void setMessageListener(MessageListener listener) throws JMSException { - checkNotClosed(); - - if (!isStopped()) - { - throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); - } - - // We are stopped - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - BasicMessageConsumer consumer = i.next(); - - if (consumer.isReceiving()) - { - throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); - } - } - - _messageListener = listener; - - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - i.next().setMessageListener(_messageListener); - } - +// checkNotClosed(); +// +// if (_dispatcher != null && !_dispatcher.connectionStopped()) +// { +// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); +// } +// +// // We are stopped +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// BasicMessageConsumer consumer = i.next(); +// +// if (consumer.isReceiving()) +// { +// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); +// } +// } +// +// _messageListener = listener; +// +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// i.next().setMessageListener(_messageListener); +// } } @@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void start() { + //fixme This should be controlled by _stopped as it pairs with the stop method + //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. + //will result in sending Flow messages for each subsequent call to flow.. only need to do this + // if we have called stop. if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); } - if(hasMessageListeners() && _dispatcher == null) + if (hasMessageListeners()) { startDistpatcherIfNecessary(); } @@ -1606,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - if(_dispatcher == null) + if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); @@ -1618,14 +1648,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { //stop the server delivering messages to this session suspendChannel(); - - //stop the dispatcher thread - _stopped.set(true); - } - - boolean isStopped() - { - return _stopped.get(); + _dispatcher.setConnectionStopped(true); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4ffec6fb41..832c312634 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer //i.e. it is only valid to call this method if // - // (a) the session is stopped, in which case the dispatcher is not running + // (a) the connection is stopped, in which case the dispatcher is not running // OR // (b) the listener is null AND we are not receiving synchronously at present // - if (_session.isStopped()) + if (!_session.getAMQConnection().started()) { _messageListener.set(messageListener); _session.setHasMessageListeners(); - _session.startDistpatcherIfNecessary(); if (_logger.isDebugEnabled()) { @@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer synchronized (_session) { - _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDistpatcherIfNecessary(); @@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - _session.startDistpatcherIfNecessary(); checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - _session.startDistpatcherIfNecessary(); - checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } diff --git a/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java new file mode 100644 index 0000000000..54453db784 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class DispatcherTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(DispatcherTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int _receivedCount = 0; + private int _receivedCountWhileStopped = 0; + private Connection _clientConnection, _producerConnection; + private MessageConsumer _consumer; + MessageProducer _producer; + Session _clientSession, _producerSession; + + private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); //all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); //all messages Sent Lock + + private volatile boolean _connectionStopped = false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + + } + + protected void tearDown() throws Exception + { + assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount()); + assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount()); + assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount); + assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped); + + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testAsynchronousRecieve() + { + + _logger.info("Test Start"); + + //Set default Message Listener + try + { + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message); + + _receivedCount++; + + if (_receivedCount == MSG_COUNT) + { + _allFirstMessagesSent.countDown(); + } + + if (_connectionStopped) + { + _logger.info("Running with Message:" + _receivedCount); + } + + if (_connectionStopped && _allFirstMessagesSent.getCount() == 0) + { + _receivedCountWhileStopped++; + } + + if (_allFirstMessagesSent.getCount() == 0) + { + if (_receivedCount == MSG_COUNT * 2) + { + _allSecondMessagesSent.countDown(); + } + } + } + }); + + + // FIXME Note : Should we need to call start to be able to call stop? + //_clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer1"); + } + + + try + { + _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + + try + { + _clientConnection.stop(); + _connectionStopped = true; + } + catch (JMSException e) + { + _logger.error("Error stopping connection"); + } + + + try + { + _logger.error("Send additional messages"); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + } + catch (JMSException e) + { + _logger.error("Unable to send additional messages", e); + } + + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + //ignore + } + + try + { + _logger.info("Restarting connection"); + + _connectionStopped = false; + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Better ML on consumer1", e); + } + + + _logger.info("Waiting upto 2 seconds for messages"); + + try + { + _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(DispatcherTest.class); + } +} |