diff options
3 files changed, 348 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b6429972df..c436121855 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; - private final java.util.Queue<MessageConsumerPair> _reprocessQueue; - private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private long _nextProducerId; - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ - private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false); /** * Set when recover is called. This is to handle the case where recover() is called by application code @@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private boolean _inRecovery; + private boolean _connectionStopped; + private boolean _hasMessageListeners; /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - + private class Dispatcher extends Thread { + + /** + * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. + */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { UnprocessedMessage message; - _stopped.set(false); + try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) { - dispatchMessage(message); + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } + + dispatchMessage(message); + + while (connectionStopped()) + { + _lock.wait(); + } + + } + } } catch (InterruptedException e) @@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _logger.info("Dispatcher thread terminating for channel " + _channelId); } + // only call while holding lock + final boolean connectionStopped() + { + return _connectionStopped; + } + + void setConnectionStopped(boolean connectionStopped) + { + synchronized (_lock) + { + _connectionStopped = connectionStopped; + _lock.notify(); + } + } + private void dispatchMessage(UnprocessedMessage message) { if (message.getDeliverBody() != null) @@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public void stopDispatcher() + public void close() { - _stopped.set(true); + _closed.set(true); interrupt(); + + //fixme awaitTermination + } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) { @@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; - _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); - if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - + public void rollback() throws JMSException { checkTransacted(); @@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // TODO: Be aware of possible changes to parameter order as versions change. getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false)); // requeue + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageListener getMessageListener() throws JMSException { - checkNotClosed(); +// checkNotClosed(); return _messageListener; } public void setMessageListener(MessageListener listener) throws JMSException { - checkNotClosed(); - - if (!isStopped()) - { - throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); - } - - // We are stopped - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - BasicMessageConsumer consumer = i.next(); - - if (consumer.isReceiving()) - { - throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); - } - } - - _messageListener = listener; - - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - i.next().setMessageListener(_messageListener); - } - +// checkNotClosed(); +// +// if (_dispatcher != null && !_dispatcher.connectionStopped()) +// { +// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); +// } +// +// // We are stopped +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// BasicMessageConsumer consumer = i.next(); +// +// if (consumer.isReceiving()) +// { +// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); +// } +// } +// +// _messageListener = listener; +// +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// i.next().setMessageListener(_messageListener); +// } } @@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void start() { + //fixme This should be controlled by _stopped as it pairs with the stop method + //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. + //will result in sending Flow messages for each subsequent call to flow.. only need to do this + // if we have called stop. if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); } - if(hasMessageListeners() && _dispatcher == null) + if (hasMessageListeners()) { startDistpatcherIfNecessary(); } @@ -1606,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - if(_dispatcher == null) + if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); @@ -1618,14 +1648,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { //stop the server delivering messages to this session suspendChannel(); - - //stop the dispatcher thread - _stopped.set(true); - } - - boolean isStopped() - { - return _stopped.get(); + _dispatcher.setConnectionStopped(true); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4ffec6fb41..832c312634 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer //i.e. it is only valid to call this method if // - // (a) the session is stopped, in which case the dispatcher is not running + // (a) the connection is stopped, in which case the dispatcher is not running // OR // (b) the listener is null AND we are not receiving synchronously at present // - if (_session.isStopped()) + if (!_session.getAMQConnection().started()) { _messageListener.set(messageListener); _session.setHasMessageListeners(); - _session.startDistpatcherIfNecessary(); if (_logger.isDebugEnabled()) { @@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer synchronized (_session) { - _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDistpatcherIfNecessary(); @@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - _session.startDistpatcherIfNecessary(); checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - _session.startDistpatcherIfNecessary(); - checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } diff --git a/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java new file mode 100644 index 0000000000..54453db784 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class DispatcherTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(DispatcherTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int _receivedCount = 0; + private int _receivedCountWhileStopped = 0; + private Connection _clientConnection, _producerConnection; + private MessageConsumer _consumer; + MessageProducer _producer; + Session _clientSession, _producerSession; + + private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); //all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); //all messages Sent Lock + + private volatile boolean _connectionStopped = false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + + } + + protected void tearDown() throws Exception + { + assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount()); + assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount()); + assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount); + assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped); + + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testAsynchronousRecieve() + { + + _logger.info("Test Start"); + + //Set default Message Listener + try + { + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message); + + _receivedCount++; + + if (_receivedCount == MSG_COUNT) + { + _allFirstMessagesSent.countDown(); + } + + if (_connectionStopped) + { + _logger.info("Running with Message:" + _receivedCount); + } + + if (_connectionStopped && _allFirstMessagesSent.getCount() == 0) + { + _receivedCountWhileStopped++; + } + + if (_allFirstMessagesSent.getCount() == 0) + { + if (_receivedCount == MSG_COUNT * 2) + { + _allSecondMessagesSent.countDown(); + } + } + } + }); + + + // FIXME Note : Should we need to call start to be able to call stop? + //_clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer1"); + } + + + try + { + _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + + try + { + _clientConnection.stop(); + _connectionStopped = true; + } + catch (JMSException e) + { + _logger.error("Error stopping connection"); + } + + + try + { + _logger.error("Send additional messages"); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + } + catch (JMSException e) + { + _logger.error("Unable to send additional messages", e); + } + + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + //ignore + } + + try + { + _logger.info("Restarting connection"); + + _connectionStopped = false; + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Better ML on consumer1", e); + } + + + _logger.info("Waiting upto 2 seconds for messages"); + + try + { + _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(DispatcherTest.class); + } +} |