summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java335
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java150
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java200
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java164
7 files changed, 784 insertions, 146 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 7f17dad8e6..bf812ee302 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -88,6 +89,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
+ private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
+
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -135,11 +138,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
/**
+ * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+ */
+ private final AtomicBoolean _pausing = new AtomicBoolean(false);
+
+ /**
+ * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+ */
+ private final AtomicBoolean _paused = new AtomicBoolean(false);
+
+ /**
* Set when recover is called. This is to handle the case where recover() is called by application code
* during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
+ public void doDispatcherTask(DispatcherCallback dispatcherCallback)
+ {
+ synchronized (this)
+ {
+ _dispatcher.pause();
+
+ dispatcherCallback.whilePaused(_reprocessQueue);
+
+ _dispatcher.reprocess();
+ }
+ }
/**
@@ -147,6 +171,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private class Dispatcher extends Thread
{
+ private final Logger _logger = Logger.getLogger(Dispatcher.class);
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -154,23 +180,105 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
- UnprocessedMessage message;
_stopped.set(false);
+
+ while (!_stopped.get())
+ {
+ if (_pausing.get())
+ {
+ try
+ {
+ //Wait for unpausing
+ synchronized (_pausing)
+ {
+ synchronized (_paused)
+ {
+ _paused.notify();
+ }
+
+ _logger.info("dispatcher paused");
+
+ _pausing.wait();
+ _logger.info("dispatcher notified");
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing... occurs when a pause request occurs will already
+ // be here if another pause event is pending
+ _logger.info("dispacher interrupted");
+ }
+
+ doReDispatch();
+
+ }
+ else
+ {
+ doNormalDispatch();
+ }
+ }
+
+ _logger.info("Dispatcher thread terminating for channel " + _channelId);
+ }
+
+ private void doNormalDispatch()
+ {
+ UnprocessedMessage message;
try
{
- while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
dispatchMessage(message);
}
}
catch (InterruptedException e)
{
- ;
+ _logger.info("dispatcher normal dispatch interrupted");
}
- _logger.info("Dispatcher thread terminating for channel " + _channelId);
}
+ private void doReDispatch()
+ {
+ _logger.info("doRedispatching");
+
+ MessageConsumerPair messageConsumerPair;
+
+ if (_reprocessQueue != null)
+ {
+ _logger.info("Reprocess Queue has size:" + _reprocessQueue.size());
+ while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null))
+ {
+ reDispatchMessage(messageConsumerPair);
+ }
+ }
+
+ if (_reprocessQueue == null || _reprocessQueue.isEmpty())
+ {
+ _logger.info("Reprocess Queue emptied");
+ _pausing.set(false);
+ }
+ else
+ {
+ _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size());
+ }
+
+ }
+
+ private void reDispatchMessage(MessageConsumerPair consumerPair)
+ {
+ if (consumerPair.getItem() instanceof AbstractJMSMessage)
+ {
+ _logger.info("do renotify:" + consumerPair.getItem());
+ consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId);
+ }
+
+ // BasicMessageConsumer.notifyError(Throwable cause)
+ // will put the cause in to the list which could come out here... need to watch this.
+ }
+
+
private void dispatchMessage(UnprocessedMessage message)
{
if (message.deliverBody != null)
@@ -231,6 +339,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_stopped.set(true);
interrupt();
}
+
+ public void pause()
+ {
+ _logger.info("pausing");
+ _pausing.set(true);
+
+
+ interrupt();
+
+ synchronized (_paused)
+ {
+ try
+ {
+ _paused.wait();
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+ }
+ }
+
+ public void reprocess()
+ {
+ synchronized (_pausing)
+ {
+ _logger.info("reprocessing");
+ _pausing.notify();
+ }
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
@@ -263,6 +401,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
+
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -315,7 +455,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
return new JMSBytesMessage();
@@ -324,7 +464,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
return new JMSMapMessage();
@@ -338,7 +478,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
return (ObjectMessage) new JMSObjectMessage();
@@ -354,7 +494,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -364,7 +504,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -409,7 +549,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
+ _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -428,7 +568,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
+ TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -440,7 +580,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -451,15 +591,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
_connection.getProtocolHandler().closeSession(this);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client closing channel")); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -512,7 +652,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e)
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -653,8 +793,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false)); // requeue
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -825,8 +965,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
public MessageConsumer createBrowserConsumer(Destination destination,
- String messageSelector,
- boolean noLocal)
+ String messageSelector,
+ boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -935,11 +1075,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
+
+ //todo remove
+ e.printStackTrace();
ex.setLinkedException(e);
throw ex;
}
- synchronized(destination)
+ synchronized (destination)
{
_destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
@@ -990,16 +1133,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- 0, // ticket
- type); // type
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ false, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
_connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1014,16 +1157,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- true, // nowait
- false, // passive
- 0, // ticket
- type); // type
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1049,15 +1192,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- true, // nowait
- false, // passive
- amqd.getAMQQueueName(), // queue
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ true, // nowait
+ false, // passive
+ amqd.getAMQQueueName(), // queue
+ 0); // ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getAMQQueueName();
@@ -1069,13 +1212,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- ft, // arguments
- amqd.getExchangeName(), // exchange
- true, // nowait
- queueName, // queue
- amqd.getRoutingKey(), // routingKey
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ ft, // arguments
+ amqd.getExchangeName(), // exchange
+ true, // nowait
+ queueName, // queue
+ amqd.getRoutingKey(), // routingKey
+ 0); // ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1098,11 +1241,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
}
- if(consumer.isAutoClose())
+ if (consumer.isAutoClose())
{
arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
- if(consumer.isNoConsume())
+ if (consumer.isNoConsume())
{
arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
@@ -1117,15 +1260,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ 0); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1282,9 +1425,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
else
{
AMQShortString topicName;
- if(topic instanceof AMQTopic)
+ if (topic instanceof AMQTopic)
{
- topicName = ((AMQTopic)topic).getDestinationName();
+ topicName = ((AMQTopic) topic).getDestinationName();
}
else
{
@@ -1315,12 +1458,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ 0); // ticket
_connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1360,7 +1503,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkValidQueue(queue);
- return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
+ return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1410,10 +1553,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
- queueName, // queue
- routingKey); // routingKey
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ queueName, // queue
+ routingKey); // routingKey
AMQMethodEvent response = null;
try
{
@@ -1474,9 +1617,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- deliveryTag, // deliveryTag
- multiple); // multiple
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ deliveryTag, // deliveryTag
+ multiple); // multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1521,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//stop the server delivering messages to this session
suspendChannel();
-//stop the dispatcher thread
+ //stop the dispatcher thread
_stopped.set(true);
}
@@ -1574,7 +1717,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
Destination dest = consumer.getDestination();
- synchronized(dest)
+ synchronized (dest)
{
if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
{
@@ -1639,8 +1782,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false); // active
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
@@ -1651,15 +1794,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- true); // active
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ true); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
public void confirmConsumerCancelled(AMQShortString consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
- if((consumer != null) && (consumer.isAutoClose()))
+ if ((consumer != null) && (consumer.isAutoClose()))
{
consumer.closeWhenNoMessages(true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index c5e97a27f6..b3ae54f982 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -34,6 +34,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Iterator;
+import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
@@ -63,7 +64,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* Holds an atomic reference to the listener installed.
*/
- private final AtomicReference _messageListener = new AtomicReference();
+ private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
/**
* The consumer tag allows us to close the consumer by sending a jmsCancel method to the
@@ -78,13 +79,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* Used in the blocking receive methods to receive a message from
- * the Session thread. Argument true indicates we want strict FIFO semantics
+ * the Session thread.
+ * <p/>
+ * Or to notify of errors
+ * <p/>
+ * Argument true indicates we want strict FIFO semantics
*/
private final ArrayBlockingQueue _synchronousQueue;
private MessageFactoryRegistry _messageFactory;
- private AMQSession _session;
+ private final AMQSession _session;
private AMQProtocolHandler _protocolHandler;
@@ -141,8 +146,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private Thread _receivingThread;
/**
- * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
- * on the queue. This is used for queue browsing.
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
*/
private boolean _autoClose;
private boolean _closeWhenNoMessages;
@@ -179,14 +184,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public String getMessageSelector() throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
return _messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
- checkPreConditions();
- return (MessageListener) _messageListener.get();
+ checkPreConditions();
+ return _messageListener.get();
}
public int getAcknowledgeMode()
@@ -199,9 +204,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _messageListener.get() != null;
}
- public void setMessageListener(MessageListener messageListener) throws JMSException
+ public void setMessageListener(final MessageListener messageListener) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
//if the current listener is non-null and the session is not stopped, then
//it is an error to call this method.
@@ -216,7 +221,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_session.isStopped())
{
_messageListener.set(messageListener);
- _logger.debug("Message listener set for destination " + _destination);
+ _logger.debug("Session stopped : Message listener set for destination " + _destination);
}
else
{
@@ -228,18 +233,35 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
}
+
_logger.debug("Message listener set for destination " + _destination);
if (messageListener != null)
{
- //handle case where connection has already been started, and the dispatcher is blocked
- //doing a put on the _synchronousQueue
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
- if (jmsMsg != null)
+ //handle case where connection has already been started, and the dispatcher has alreaded started
+ // putting values on the _synchronousQueue
+
+ synchronized (_session)
{
- preApplicationProcessing(jmsMsg);
- messageListener.onMessage(jmsMsg);
- postDeliver(jmsMsg);
+ //Pause Dispatcher
+ _session.doDispatcherTask(new DispatcherCallback(this)
+ {
+ public void whilePaused(Queue<MessageConsumerPair> reprocessQueue)
+ {
+ // Prepend messages in _synchronousQueue to dispatcher queue
+ _logger.debug("ReprocessQueue current size:" + reprocessQueue.size());
+ for (Object item : _synchronousQueue)
+ {
+ reprocessQueue.offer(new MessageConsumerPair(_consumer, item));
+ }
+ _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size());
+
+ // Set Message Listener
+ _logger.debug("Set Message Listener");
+ _messageListener.set(messageListener);
+ }
+ }
+ );
}
}
}
@@ -247,7 +269,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
- if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
@@ -314,13 +336,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
acquireReceiving();
try
{
- if(closeOnAutoClose())
+ if (closeOnAutoClose())
{
return null;
}
@@ -355,7 +377,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private boolean closeOnAutoClose() throws JMSException
{
- if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+ if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
{
close(false);
return true;
@@ -368,13 +390,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
acquireReceiving();
try
{
- if(closeOnAutoClose())
+ if (closeOnAutoClose())
{
return null;
}
@@ -430,19 +452,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
- if(sendClose)
+ if (sendClose)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- _consumerTag, // consumerTag
- false); // nowait
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -499,7 +521,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
messageFrame.contentHeader,
messageFrame.bodies);
- if(debug)
+ if (debug)
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
@@ -507,6 +529,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
preDeliver(jmsMessage);
+ notifyMessage(jmsMessage, channelId);
+ }
+ catch (Exception e)
+ {
+ if (e instanceof InterruptedException)
+ {
+ _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+ }
+ else
+ {
+ _logger.error("Caught exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ /**
+ * @param jmsMessage this message has already been processed so can't redo preDeliver
+ * @param channelId
+ */
+ public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+ {
+ try
+ {
if (isMessageListenerSet())
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
@@ -517,6 +562,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
+ //This shouldn't be possible.
_synchronousQueue.put(jmsMessage);
}
}
@@ -524,11 +570,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (e instanceof InterruptedException)
{
- _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+ _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
}
else
{
- _logger.error("Caught exception (dump follows) - ignoring...", e);
+ _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
}
}
}
@@ -550,7 +596,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private void postDeliver(AbstractJMSMessage msg) throws JMSException
{
- msg.setJMSDestination(_destination);
+ msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
case Session.CLIENT_ACKNOWLEDGE:
@@ -613,6 +659,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closed.set(true);
+ //QPID-293 can "request redelivery of this error through dispatcher"
+
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
// deal with the case where we have a synchronous receive() waiting for a message to arrive
if (!isMessageListenerSet())
@@ -626,13 +674,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
deregisterConsumer();
}
+
/**
* Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean
* case and in the case of an error occurring.
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(this);
+ _session.deregisterConsumer(this);
}
public AMQShortString getConsumerTag()
@@ -645,26 +694,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_consumerTag = consumerTag;
}
- public AMQSession getSession() {
- return _session;
- }
+ public AMQSession getSession()
+ {
+ return _session;
+ }
- private void checkPreConditions() throws JMSException{
+ private void checkPreConditions() throws JMSException
+ {
- this.checkNotClosed();
+ this.checkNotClosed();
- if(_session == null || _session.isClosed()){
- throw new javax.jms.IllegalStateException("Invalid Session");
- }
- }
+ if (_session == null || _session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
public void acknowledge() throws JMSException
{
- if(!isClosed())
+ if (!isClosed())
{
Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
- while(tags.hasNext())
+ while (tags.hasNext())
{
_session.acknowledgeMessage(tags.next(), false);
tags.remove();
@@ -699,10 +751,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closeWhenNoMessages = b;
- if(_closeWhenNoMessages
- && _synchronousQueue.isEmpty()
- && _receiving.get()
- && _messageListener != null)
+ if (_closeWhenNoMessages
+ && _synchronousQueue.isEmpty()
+ && _receiving.get()
+ && _messageListener != null)
{
_receivingThread.interrupt();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
new file mode 100644
index 0000000000..81a55006ed
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Queue;
+
+public abstract class DispatcherCallback
+{
+ BasicMessageConsumer _consumer;
+
+ public DispatcherCallback(BasicMessageConsumer mc)
+ {
+ _consumer = mc;
+ }
+
+ abstract public void whilePaused(Queue<MessageConsumerPair> reprocessQueue);
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
new file mode 100644
index 0000000000..585d6db3fd
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+public class MessageConsumerPair
+{
+ BasicMessageConsumer _consumer;
+ Object _item;
+
+ public MessageConsumerPair(BasicMessageConsumer consumer, Object item)
+ {
+ _consumer = consumer;
+ _item = item;
+ }
+
+ public BasicMessageConsumer getConsumer()
+ {
+ return _consumer;
+ }
+
+ public Object getItem()
+ {
+ return _item;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 32a25bd915..c6d6731967 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -86,7 +86,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
}
else
{
- _logger.warn("No Provider URL specified.");
+ _logger.info("No Provider URL specified.");
}
}
catch (IOException ioe)
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
new file mode 100644
index 0000000000..58aaaf56b8
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(MessageListenerMultiConsumerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int receivedCount1 = 0;
+ private int receivedCount2 = 0;
+ private Connection _clientConnection;
+ private MessageConsumer _consumer1;
+ private MessageConsumer _consumer2;
+
+ private boolean _testAsync;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer1 = clientSession1.createConsumer(queue);
+
+ //Create Client 2
+ Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer2 = clientSession2.createConsumer(queue);
+
+ //Create Producer
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ _testAsync = false;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ //Should have recieved all async messages
+ if (_testAsync)
+ {
+ assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
+ }
+ _clientConnection.close();
+
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testRecieveC1thenC2() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+
+ public void testRecieveInterleaved() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer1.receive() != null);
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+
+
+ public void testAsynchronousRecieve() throws Exception
+ {
+ _testAsync = true;
+
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message);
+
+ receivedCount1++;
+ }
+ });
+
+ _consumer2.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message);
+
+ receivedCount2++;
+ }
+ });
+
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ Thread.sleep(6000);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
new file mode 100644
index 0000000000..b99593aaa5
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerTest extends TestCase implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(MessageListenerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 5;
+ private int receivedCount = 0;
+ private MessageConsumer _consumer;
+ private Connection _clientConnection;
+ private boolean _testAsync;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ _consumer = clientSession.createConsumer(queue);
+
+ //Create Producer
+
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ _testAsync = false;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ //Should have recieved all async messages
+ if (_testAsync)
+ {
+ assertEquals(MSG_COUNT, receivedCount);
+ }
+ _clientConnection.close();
+
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testSynchronousRecieve() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(_consumer.receive() != null);
+ }
+ }
+
+ public void testAsynchronousRecieve() throws Exception
+ {
+ _testAsync = true;
+
+ _consumer.setMessageListener(this);
+
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ }
+
+ public void onMessage(Message message)
+ {
+ _logger.info("Received Message(" + receivedCount + "):" + message);
+
+ receivedCount++;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerTest.class);
+ }
+}