summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java155
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java253
3 files changed, 348 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b6429972df..c436121855 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
- private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
-
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private long _nextProducerId;
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
- private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
/**
* Set when recover is called. This is to handle the case where recover() is called by application code
@@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private boolean _inRecovery;
+ private boolean _connectionStopped;
+
private boolean _hasMessageListeners;
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
-
+
private class Dispatcher extends Thread
{
+
+ /**
+ * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
+ */
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private final Object _lock = new Object();
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
UnprocessedMessage message;
- _stopped.set(false);
+
try
{
- while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
- dispatchMessage(message);
+ synchronized (_lock)
+ {
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ dispatchMessage(message);
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ }
+
}
}
catch (InterruptedException e)
@@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.info("Dispatcher thread terminating for channel " + _channelId);
}
+ // only call while holding lock
+ final boolean connectionStopped()
+ {
+ return _connectionStopped;
+ }
+
+ void setConnectionStopped(boolean connectionStopped)
+ {
+ synchronized (_lock)
+ {
+ _connectionStopped = connectionStopped;
+ _lock.notify();
+ }
+ }
+
private void dispatchMessage(UnprocessedMessage message)
{
if (message.getDeliverBody() != null)
@@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public void stopDispatcher()
+ public void close()
{
- _stopped.set(true);
+ _closed.set(true);
interrupt();
+
+ //fixme awaitTermination
+
}
}
-
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
{
@@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
- _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
-
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
-
+
public void rollback() throws JMSException
{
checkTransacted();
@@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// TODO: Be aware of possible changes to parameter order as versions change.
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false)); // requeue
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageListener getMessageListener() throws JMSException
{
- checkNotClosed();
+// checkNotClosed();
return _messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
- checkNotClosed();
-
- if (!isStopped())
- {
- throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
- }
-
- // We are stopped
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- BasicMessageConsumer consumer = i.next();
-
- if (consumer.isReceiving())
- {
- throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
- }
- }
-
- _messageListener = listener;
-
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- i.next().setMessageListener(_messageListener);
- }
-
+// checkNotClosed();
+//
+// if (_dispatcher != null && !_dispatcher.connectionStopped())
+// {
+// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+// }
+//
+// // We are stopped
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// BasicMessageConsumer consumer = i.next();
+//
+// if (consumer.isReceiving())
+// {
+// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+// }
+// }
+//
+// _messageListener = listener;
+//
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// i.next().setMessageListener(_messageListener);
+// }
}
@@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void start()
{
+ //fixme This should be controlled by _stopped as it pairs with the stop method
+ //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
+ //will result in sending Flow messages for each subsequent call to flow.. only need to do this
+ // if we have called stop.
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
}
- if(hasMessageListeners() && _dispatcher == null)
+ if (hasMessageListeners())
{
startDistpatcherIfNecessary();
}
@@ -1606,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
- if(_dispatcher == null)
+ if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
@@ -1618,14 +1648,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
//stop the server delivering messages to this session
suspendChannel();
-
- //stop the dispatcher thread
- _stopped.set(true);
- }
-
- boolean isStopped()
- {
- return _stopped.get();
+ _dispatcher.setConnectionStopped(true);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4ffec6fb41..832c312634 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
//i.e. it is only valid to call this method if
//
- // (a) the session is stopped, in which case the dispatcher is not running
+ // (a) the connection is stopped, in which case the dispatcher is not running
// OR
// (b) the listener is null AND we are not receiving synchronously at present
//
- if (_session.isStopped())
+ if (!_session.getAMQConnection().started())
{
_messageListener.set(messageListener);
_session.setHasMessageListeners();
- _session.startDistpatcherIfNecessary();
if (_logger.isDebugEnabled())
{
@@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
synchronized (_session)
{
-
_messageListener.set(messageListener);
_session.setHasMessageListeners();
_session.startDistpatcherIfNecessary();
@@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
- _session.startDistpatcherIfNecessary();
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
- _session.startDistpatcherIfNecessary();
-
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- //This shouldn't be possible.
_synchronousQueue.put(jmsMessage);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
new file mode 100644
index 0000000000..54453db784
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class DispatcherTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(DispatcherTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int _receivedCount = 0;
+ private int _receivedCountWhileStopped = 0;
+ private Connection _clientConnection, _producerConnection;
+ private MessageConsumer _consumer;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+
+ private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); //all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); //all messages Sent Lock
+
+ private volatile boolean _connectionStopped = false;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer = _clientSession.createConsumer(queue);
+
+ //Create Producer
+ _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount());
+ assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount());
+ assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount);
+ assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped);
+
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testAsynchronousRecieve()
+ {
+
+ _logger.info("Test Start");
+
+ //Set default Message Listener
+ try
+ {
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message);
+
+ _receivedCount++;
+
+ if (_receivedCount == MSG_COUNT)
+ {
+ _allFirstMessagesSent.countDown();
+ }
+
+ if (_connectionStopped)
+ {
+ _logger.info("Running with Message:" + _receivedCount);
+ }
+
+ if (_connectionStopped && _allFirstMessagesSent.getCount() == 0)
+ {
+ _receivedCountWhileStopped++;
+ }
+
+ if (_allFirstMessagesSent.getCount() == 0)
+ {
+ if (_receivedCount == MSG_COUNT * 2)
+ {
+ _allSecondMessagesSent.countDown();
+ }
+ }
+ }
+ });
+
+
+ // FIXME Note : Should we need to call start to be able to call stop?
+ //_clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Default ML on consumer1");
+ }
+
+
+ try
+ {
+ _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ try
+ {
+ _clientConnection.stop();
+ _connectionStopped = true;
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error stopping connection");
+ }
+
+
+ try
+ {
+ _logger.error("Send additional messages");
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to send additional messages", e);
+ }
+
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+
+ try
+ {
+ _logger.info("Restarting connection");
+
+ _connectionStopped = false;
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Better ML on consumer1", e);
+ }
+
+
+ _logger.info("Waiting upto 2 seconds for messages");
+
+ try
+ {
+ _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(DispatcherTest.class);
+ }
+}