summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-17 13:31:48 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-17 13:31:48 +0000
commitb7f5e7f45c681f8bbc5a86ed0917427749ff6c47 (patch)
treeef7c23c441e2bda1c2fe20da28e03c1614bba1de
parent47ac1646674d11de0002613a71cf4588a8c8af7a (diff)
downloadqpid-python-b7f5e7f45c681f8bbc5a86ed0917427749ff6c47.tar.gz
Hand patched bug fixes from post persistence changes
Revision: 496661 Author: ritchiem Date: 11:13:38, 16 January 2007 Message: QPID-300 Updated BlockingMethodFrameListener so it passed FailoverExceptions without wrapping in AMQExceptions. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Revision: 496658 Author: rgreig Date: 10:51:04, 16 January 2007 Message: QPID-299 Messages not being correctly requeued when transacted session closed ---- Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Revision: 496641 Author: ritchiem Date: 09:43:37, 16 January 2007 Message: QPID-293 Added DispatcherCallback and MessageConsumerPair to allow Processed Messages to be returned to the consumer for redelivery whilst pausing the dispatcher. AMQSession updated to create the callback and populate the queue. Created two test cases that check the messages are correctly delivered with and without message listeners for 1 and 2 clients. Minor non-JIRA related. PropertiesFileInitialContextFactory dropped a warn log to info. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@497016 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java150
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java92
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java43
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java200
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java164
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java88
10 files changed, 760 insertions, 30 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index c5b45659cf..2fb3a0511f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -337,7 +337,8 @@ public class AMQChannel
}
}
unsubscribeAllConsumers(session);
- requeue();
+ requeue();
+ _txnBuffer.commit();
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0dfd469d8d..59a871a6d6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -92,6 +93,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
+ private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
+
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -139,11 +142,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
/**
+ * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+ */
+ private final AtomicBoolean _pausing = new AtomicBoolean(false);
+
+ /**
+ * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+ */
+ private final AtomicBoolean _paused = new AtomicBoolean(false);
+
+ /**
* Set when recover is called. This is to handle the case where recover() is called by application code
* during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
+ public void doDispatcherTask(DispatcherCallback dispatcherCallback)
+ {
+ synchronized (this)
+ {
+ _dispatcher.pause();
+
+ dispatcherCallback.whilePaused(_reprocessQueue);
+
+ _dispatcher.reprocess();
+ }
+ }
/**
@@ -151,6 +175,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private class Dispatcher extends Thread
{
+ private final Logger _logger = Logger.getLogger(Dispatcher.class);
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -158,23 +184,105 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
- UnprocessedMessage message;
_stopped.set(false);
+
+ while (!_stopped.get())
+ {
+ if (_pausing.get())
+ {
+ try
+ {
+ //Wait for unpausing
+ synchronized (_pausing)
+ {
+ synchronized (_paused)
+ {
+ _paused.notify();
+ }
+
+ _logger.info("dispatcher paused");
+
+ _pausing.wait();
+ _logger.info("dispatcher notified");
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing... occurs when a pause request occurs will already
+ // be here if another pause event is pending
+ _logger.info("dispacher interrupted");
+ }
+
+ doReDispatch();
+
+ }
+ else
+ {
+ doNormalDispatch();
+ }
+ }
+
+ _logger.info("Dispatcher thread terminating for channel " + _channelId);
+ }
+
+ private void doNormalDispatch()
+ {
+ UnprocessedMessage message;
try
{
- while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
dispatchMessage(message);
}
}
catch (InterruptedException e)
{
- ;
+ _logger.info("dispatcher normal dispatch interrupted");
}
- _logger.info("Dispatcher thread terminating for channel " + _channelId);
}
+ private void doReDispatch()
+ {
+ _logger.info("doRedispatching");
+
+ MessageConsumerPair messageConsumerPair;
+
+ if (_reprocessQueue != null)
+ {
+ _logger.info("Reprocess Queue has size:" + _reprocessQueue.size());
+ while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null))
+ {
+ reDispatchMessage(messageConsumerPair);
+ }
+ }
+
+ if (_reprocessQueue == null || _reprocessQueue.isEmpty())
+ {
+ _logger.info("Reprocess Queue emptied");
+ _pausing.set(false);
+ }
+ else
+ {
+ _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size());
+ }
+
+ }
+
+ private void reDispatchMessage(MessageConsumerPair consumerPair)
+ {
+ if (consumerPair.getItem() instanceof AbstractJMSMessage)
+ {
+ _logger.info("do renotify:" + consumerPair.getItem());
+ consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId);
+ }
+
+ // BasicMessageConsumer.notifyError(Throwable cause)
+ // will put the cause in to the list which could come out here... need to watch this.
+ }
+
+
private void dispatchMessage(UnprocessedMessage message)
{
if (message.deliverBody != null)
@@ -235,6 +343,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_stopped.set(true);
interrupt();
}
+
+ public void pause()
+ {
+ _logger.info("pausing");
+ _pausing.set(true);
+
+
+ interrupt();
+
+ synchronized (_paused)
+ {
+ try
+ {
+ _paused.wait();
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+ }
+ }
+
+ public void reprocess()
+ {
+ synchronized (_pausing)
+ {
+ _logger.info("reprocessing");
+ _pausing.notify();
+ }
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
@@ -267,6 +405,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
+
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -1583,7 +1723,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//stop the server delivering messages to this session
suspendChannel();
-//stop the dispatcher thread
+ //stop the dispatcher thread
_stopped.set(true);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 1033e827de..bdf26e6a5e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -39,6 +39,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Iterator;
+import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
@@ -68,7 +69,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* Holds an atomic reference to the listener installed.
*/
- private final AtomicReference _messageListener = new AtomicReference();
+ private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
/**
* The consumer tag allows us to close the consumer by sending a jmsCancel method to the
@@ -83,13 +84,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* Used in the blocking receive methods to receive a message from
- * the Session thread. Argument true indicates we want strict FIFO semantics
+ * the Session thread.
+ * <p/>
+ * Or to notify of errors
+ * <p/>
+ * Argument true indicates we want strict FIFO semantics
*/
private final ArrayBlockingQueue _synchronousQueue;
private MessageFactoryRegistry _messageFactory;
- private AMQSession _session;
+ private final AMQSession _session;
private AMQProtocolHandler _protocolHandler;
@@ -146,8 +151,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private Thread _receivingThread;
/**
- * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
- * on the queue. This is used for queue browsing.
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
*/
private boolean _autoClose;
private boolean _closeWhenNoMessages;
@@ -190,8 +195,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public MessageListener getMessageListener() throws JMSException
{
- checkPreConditions();
- return (MessageListener) _messageListener.get();
+ checkPreConditions();
+ return _messageListener.get();
}
public int getAcknowledgeMode()
@@ -204,7 +209,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _messageListener.get() != null;
}
- public void setMessageListener(MessageListener messageListener) throws JMSException
+ public void setMessageListener(final MessageListener messageListener) throws JMSException
{
checkPreConditions();
@@ -221,7 +226,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_session.isStopped())
{
_messageListener.set(messageListener);
- _logger.debug("Message listener set for destination " + _destination);
+ _logger.debug("Session stopped : Message listener set for destination " + _destination);
}
else
{
@@ -237,14 +242,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (messageListener != null)
{
- //handle case where connection has already been started, and the dispatcher is blocked
- //doing a put on the _synchronousQueue
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
- if (jmsMsg != null)
+ //handle case where connection has already been started, and the dispatcher has alreaded started
+ // putting values on the _synchronousQueue
+
+ synchronized (_session)
{
- preApplicationProcessing(jmsMsg);
- messageListener.onMessage(jmsMsg);
- postDeliver(jmsMsg);
+ //Pause Dispatcher
+ _session.doDispatcherTask(new DispatcherCallback(this)
+ {
+ public void whilePaused(Queue<MessageConsumerPair> reprocessQueue)
+ {
+ // Prepend messages in _synchronousQueue to dispatcher queue
+ _logger.debug("ReprocessQueue current size:" + reprocessQueue.size());
+ for (Object item : _synchronousQueue)
+ {
+ reprocessQueue.offer(new MessageConsumerPair(_consumer, item));
+ }
+ _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size());
+
+ // Set Message Listener
+ _logger.debug("Set Message Listener");
+ _messageListener.set(messageListener);
+ }
+ }
+ );
}
}
}
@@ -498,7 +519,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
{
- if (_logger.isDebugEnabled())
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug)
{
_logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
}
@@ -509,11 +532,37 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
messageFrame.contentHeader,
messageFrame.bodies);
- _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ if (debug)
+ {
+ _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ }
jmsMessage.setConsumer(this);
preDeliver(jmsMessage);
+ notifyMessage(jmsMessage, channelId);
+ }
+ catch (Exception e)
+ {
+ if (e instanceof InterruptedException)
+ {
+ _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+ }
+ else
+ {
+ _logger.error("Caught exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ /**
+ * @param jmsMessage this message has already been processed so can't redo preDeliver
+ * @param channelId
+ */
+ public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+ {
+ try
+ {
if (isMessageListenerSet())
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
@@ -524,6 +573,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
+ //This shouldn't be possible.
_synchronousQueue.put(jmsMessage);
}
}
@@ -531,11 +581,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (e instanceof InterruptedException)
{
- _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+ _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
}
else
{
- _logger.error("Caught exception (dump follows) - ignoring...", e);
+ _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
}
}
}
@@ -620,6 +670,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closed.set(true);
+ //QPID-293 can "request redelivery of this error through dispatcher"
+
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
// deal with the case where we have a synchronous receive() waiting for a message to arrive
if (!isMessageListenerSet())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
new file mode 100644
index 0000000000..81a55006ed
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Queue;
+
+public abstract class DispatcherCallback
+{
+ BasicMessageConsumer _consumer;
+
+ public DispatcherCallback(BasicMessageConsumer mc)
+ {
+ _consumer = mc;
+ }
+
+ abstract public void whilePaused(Queue<MessageConsumerPair> reprocessQueue);
+
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
new file mode 100644
index 0000000000..585d6db3fd
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+public class MessageConsumerPair
+{
+ BasicMessageConsumer _consumer;
+ Object _item;
+
+ public MessageConsumerPair(BasicMessageConsumer consumer, Object item)
+ {
+ _consumer = consumer;
+ _item = item;
+ }
+
+ public BasicMessageConsumer getConsumer()
+ {
+ return _consumer;
+ }
+
+ public Object getItem()
+ {
+ return _item;
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 21ae3fc71f..5342eb86f6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.AMQMethodBody;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -49,6 +50,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
/**
* This method is called by the MINA dispatching thread. Note that it could
* be called before blockForFrame() has been called.
+ *
* @param evt the frame event
* @return true if the listener has dealt with this frame
* @throws AMQException
@@ -106,11 +108,16 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
if (_error instanceof AMQException)
{
- throw (AMQException)_error;
+ throw(AMQException) _error;
+ }
+ else if (_error instanceof FailoverException)
+ {
+ // This should ensure that FailoverException is not wrapped and can be caught.
+ throw(FailoverException) _error; // needed to expose FailoverException.
}
else
{
- throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+ throw new AMQException("Woken up due to " + _error.getClass(), _error);
}
}
@@ -120,6 +127,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
/**
* This is a callback, called by the MINA dispatcher thread only. It is also called from within this
* class to avoid code repetition but again is only called by the MINA dispatcher thread.
+ *
* @param e
*/
public void error(Exception e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index a49869212c..309ed38256 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -85,7 +85,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
}
else
{
- _logger.warn("No Provider URL specified.");
+ _logger.info("No Provider URL specified.");
}
}
catch (IOException ioe)
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
new file mode 100644
index 0000000000..58aaaf56b8
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(MessageListenerMultiConsumerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int receivedCount1 = 0;
+ private int receivedCount2 = 0;
+ private Connection _clientConnection;
+ private MessageConsumer _consumer1;
+ private MessageConsumer _consumer2;
+
+ private boolean _testAsync;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer1 = clientSession1.createConsumer(queue);
+
+ //Create Client 2
+ Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer2 = clientSession2.createConsumer(queue);
+
+ //Create Producer
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ _testAsync = false;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ //Should have recieved all async messages
+ if (_testAsync)
+ {
+ assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
+ }
+ _clientConnection.close();
+
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testRecieveC1thenC2() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+
+ public void testRecieveInterleaved() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer1.receive() != null);
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+
+
+ public void testAsynchronousRecieve() throws Exception
+ {
+ _testAsync = true;
+
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message);
+
+ receivedCount1++;
+ }
+ });
+
+ _consumer2.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message);
+
+ receivedCount2++;
+ }
+ });
+
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ Thread.sleep(6000);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
new file mode 100644
index 0000000000..b99593aaa5
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerTest extends TestCase implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(MessageListenerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 5;
+ private int receivedCount = 0;
+ private MessageConsumer _consumer;
+ private Connection _clientConnection;
+ private boolean _testAsync;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ _consumer = clientSession.createConsumer(queue);
+
+ //Create Producer
+
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ _testAsync = false;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ //Should have recieved all async messages
+ if (_testAsync)
+ {
+ assertEquals(MSG_COUNT, receivedCount);
+ }
+ _clientConnection.close();
+
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testSynchronousRecieve() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(_consumer.receive() != null);
+ }
+ }
+
+ public void testAsynchronousRecieve() throws Exception
+ {
+ _testAsync = true;
+
+ _consumer.setMessageListener(this);
+
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ }
+
+ public void onMessage(Message message)
+ {
+ _logger.info("Received Message(" + receivedCount + "):" + message);
+
+ receivedCount++;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 90a11307b8..ca678d1967 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -23,7 +23,12 @@ package org.apache.qpid.test.unit.transacted;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
+import org.apache.log4j.Logger;
+
import javax.jms.*;
@@ -47,10 +52,12 @@ public class TransactedTest extends TestCase
private Session testSession;
private MessageConsumer testConsumer1;
private MessageConsumer testConsumer2;
+ private static final Logger _logger = Logger.getLogger(TransactedTest.class);
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
queue1 = new AMQQueue("Q1", false);
queue2 = new AMQQueue("Q2", false);
@@ -86,6 +93,7 @@ public class TransactedTest extends TestCase
con.close();
testCon.close();
prepCon.close();
+ TransportConnection.killAllVMBrokers();
super.tearDown();
}
@@ -132,6 +140,84 @@ public class TransactedTest extends TestCase
assertTrue(null == testConsumer2.receive(1000));
}
+
+ public void testResendsMsgsAfterSessionClose() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue3 = new AMQQueue("Q3", false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue3);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue3);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ producerSession.commit();
+
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+
+ tm.acknowledge();
+ consumerSession.commit();
+
+ _logger.info("Received and acknowledged first message");
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ _logger.info("Received all four messages. Closing connection with three outstanding messages");
+
+ consumerSession.close();
+
+ consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+ consumer = consumerSession.createConsumer(queue3);
+
+ // no ack for last three messages so when I call recover I expect to get three messages back
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+ consumerSession.commit();
+ _logger.info("Calling acknowledge with no outstanding messages");
+ // all acked so no messages to be delivered
+
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ con.close();
+ con2.close();
+
+ }
+
+
+
+
private void expect(String text, Message msg) throws JMSException
{
assertTrue(msg instanceof TextMessage);
@@ -140,6 +226,6 @@ public class TransactedTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+ return new junit.framework.TestSuite(TransactedTest.class);
}
}