diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-16 09:43:37 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-16 09:43:37 +0000 |
commit | 27e91272b84568baad05f8a30bb9d5b5650dad76 (patch) | |
tree | e71f9efa7b4a7f110ec247f25bcba9c7d789b4b3 /java | |
parent | 1e5641f47a0f4eb2c2de9d5f871c46aed8c859cd (diff) | |
download | qpid-python-27e91272b84568baad05f8a30bb9d5b5650dad76.tar.gz |
QPID-293
Added DispatcherCallback and MessageConsumerPair to allow Processed Messages to be returned to the consumer for redelivery whilst pausing the dispatcher.
AMQSession updated to create the callback and populate the queue.
Created two test cases that check the messages are correctly delivered with and without message listeners for 1 and 2 clients.
Minor non-JIRA related.
PropertiesFileInitialContextFactory dropped a warn log to info.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@496641 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
7 files changed, 784 insertions, 146 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 7f17dad8e6..bf812ee302 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -88,6 +89,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; + private final java.util.Queue<MessageConsumerPair> _reprocessQueue; + private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -135,11 +138,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private volatile AtomicBoolean _stopped = new AtomicBoolean(true); /** + * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer + */ + private final AtomicBoolean _pausing = new AtomicBoolean(false); + + /** + * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer + */ + private final AtomicBoolean _paused = new AtomicBoolean(false); + + /** * Set when recover is called. This is to handle the case where recover() is called by application code * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. */ private boolean _inRecovery; + public void doDispatcherTask(DispatcherCallback dispatcherCallback) + { + synchronized (this) + { + _dispatcher.pause(); + + dispatcherCallback.whilePaused(_reprocessQueue); + + _dispatcher.reprocess(); + } + } /** @@ -147,6 +171,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private class Dispatcher extends Thread { + private final Logger _logger = Logger.getLogger(Dispatcher.class); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -154,23 +180,105 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { - UnprocessedMessage message; _stopped.set(false); + + while (!_stopped.get()) + { + if (_pausing.get()) + { + try + { + //Wait for unpausing + synchronized (_pausing) + { + synchronized (_paused) + { + _paused.notify(); + } + + _logger.info("dispatcher paused"); + + _pausing.wait(); + _logger.info("dispatcher notified"); + } + + } + catch (InterruptedException e) + { + //do nothing... occurs when a pause request occurs will already + // be here if another pause event is pending + _logger.info("dispacher interrupted"); + } + + doReDispatch(); + + } + else + { + doNormalDispatch(); + } + } + + _logger.info("Dispatcher thread terminating for channel " + _channelId); + } + + private void doNormalDispatch() + { + UnprocessedMessage message; try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null) { dispatchMessage(message); } } catch (InterruptedException e) { - ; + _logger.info("dispatcher normal dispatch interrupted"); } - _logger.info("Dispatcher thread terminating for channel " + _channelId); } + private void doReDispatch() + { + _logger.info("doRedispatching"); + + MessageConsumerPair messageConsumerPair; + + if (_reprocessQueue != null) + { + _logger.info("Reprocess Queue has size:" + _reprocessQueue.size()); + while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null)) + { + reDispatchMessage(messageConsumerPair); + } + } + + if (_reprocessQueue == null || _reprocessQueue.isEmpty()) + { + _logger.info("Reprocess Queue emptied"); + _pausing.set(false); + } + else + { + _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size()); + } + + } + + private void reDispatchMessage(MessageConsumerPair consumerPair) + { + if (consumerPair.getItem() instanceof AbstractJMSMessage) + { + _logger.info("do renotify:" + consumerPair.getItem()); + consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId); + } + + // BasicMessageConsumer.notifyError(Throwable cause) + // will put the cause in to the list which could come out here... need to watch this. + } + + private void dispatchMessage(UnprocessedMessage message) { if (message.deliverBody != null) @@ -231,6 +339,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _stopped.set(true); interrupt(); } + + public void pause() + { + _logger.info("pausing"); + _pausing.set(true); + + + interrupt(); + + synchronized (_paused) + { + try + { + _paused.wait(); + } + catch (InterruptedException e) + { + //do nothing + } + } + } + + public void reprocess() + { + synchronized (_pausing) + { + _logger.info("reprocessing"); + _pausing.notify(); + } + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -263,6 +401,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; + _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); + if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -315,7 +455,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return new JMSBytesMessage(); @@ -324,7 +464,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return new JMSMapMessage(); @@ -338,7 +478,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return (ObjectMessage) new JMSObjectMessage(); @@ -354,7 +494,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -364,7 +504,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -409,7 +549,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class); + _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxCommitOkBody.class); } catch (AMQException e) { @@ -428,7 +568,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class); + TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); } catch (AMQException e) { @@ -440,7 +580,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -451,15 +591,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { _connection.getProtocolHandler().closeSession(this); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + (byte) 8, (byte) 0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -512,7 +652,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -653,8 +793,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false)); // requeue + (byte) 8, (byte) 0, // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -825,8 +965,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createBrowserConsumer(Destination destination, - String messageSelector, - boolean noLocal) + String messageSelector, + boolean noLocal) throws JMSException { checkValidDestination(destination); @@ -935,11 +1075,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQException e) { JMSException ex = new JMSException("Error registering consumer: " + e); + + //todo remove + e.printStackTrace(); ex.setLinkedException(e); throw ex; } - synchronized(destination) + synchronized (destination) { _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); _destinationConsumerCount.get(destination).incrementAndGet(); @@ -990,16 +1133,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - 0, // ticket - type); // type + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + false, // nowait + false, // passive + 0, // ticket + type); // type _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } @@ -1014,16 +1157,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - true, // nowait - false, // passive - 0, // ticket - type); // type + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + type); // type protocolHandler.writeFrame(exchangeDeclare); } @@ -1049,15 +1192,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - true, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + true, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + 0); // ticket protocolHandler.writeFrame(queueDeclare); return amqd.getAMQQueueName(); @@ -1069,13 +1212,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - true, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + ft, // arguments + amqd.getExchangeName(), // exchange + true, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + 0); // ticket protocolHandler.writeFrame(queueBind); } @@ -1098,11 +1241,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); } - if(consumer.isAutoClose()) + if (consumer.isAutoClose()) { arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); } - if(consumer.isNoConsume()) + if (consumer.isNoConsume()) { arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } @@ -1117,15 +1260,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + 0); // ticket if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1282,9 +1425,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { AMQShortString topicName; - if(topic instanceof AMQTopic) + if (topic instanceof AMQTopic) { - topicName = ((AMQTopic)topic).getDestinationName(); + topicName = ((AMQTopic) topic).getDestinationName(); } else { @@ -1315,12 +1458,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + 0); // ticket _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } catch (AMQException e) @@ -1360,7 +1503,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidQueue(queue); - return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector); + return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1410,10 +1553,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange - queueName, // queue - routingKey); // routingKey + (byte) 8, (byte) 0, // AMQP version (major, minor) + ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + queueName, // queue + routingKey); // routingKey AMQMethodEvent response = null; try { @@ -1474,9 +1617,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple + (byte) 8, (byte) 0, // AMQP version (major, minor) + deliveryTag, // deliveryTag + multiple); // multiple if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); @@ -1521,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //stop the server delivering messages to this session suspendChannel(); -//stop the dispatcher thread + //stop the dispatcher thread _stopped.set(true); } @@ -1574,7 +1717,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } Destination dest = consumer.getDestination(); - synchronized(dest) + synchronized (dest) { if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) { @@ -1639,8 +1782,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false); // active + (byte) 8, (byte) 0, // AMQP version (major, minor) + false); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } @@ -1651,15 +1794,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - true); // active + (byte) 8, (byte) 0, // AMQP version (major, minor) + true); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } public void confirmConsumerCancelled(AMQShortString consumerTag) { BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if((consumer != null) && (consumer.isAutoClose())) + if ((consumer != null) && (consumer.isAutoClose())) { consumer.closeWhenNoMessages(true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c5e97a27f6..b3ae54f982 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -34,6 +34,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Iterator; +import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; @@ -63,7 +64,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Holds an atomic reference to the listener installed. */ - private final AtomicReference _messageListener = new AtomicReference(); + private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); /** * The consumer tag allows us to close the consumer by sending a jmsCancel method to the @@ -78,13 +79,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Used in the blocking receive methods to receive a message from - * the Session thread. Argument true indicates we want strict FIFO semantics + * the Session thread. + * <p/> + * Or to notify of errors + * <p/> + * Argument true indicates we want strict FIFO semantics */ private final ArrayBlockingQueue _synchronousQueue; private MessageFactoryRegistry _messageFactory; - private AMQSession _session; + private final AMQSession _session; private AMQProtocolHandler _protocolHandler; @@ -141,8 +146,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private Thread _receivingThread; /** - * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive - * on the queue. This is used for queue browsing. + * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive + * on the queue. This is used for queue browsing. */ private boolean _autoClose; private boolean _closeWhenNoMessages; @@ -179,14 +184,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { - checkPreConditions(); + checkPreConditions(); return _messageSelector; } public MessageListener getMessageListener() throws JMSException { - checkPreConditions(); - return (MessageListener) _messageListener.get(); + checkPreConditions(); + return _messageListener.get(); } public int getAcknowledgeMode() @@ -199,9 +204,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _messageListener.get() != null; } - public void setMessageListener(MessageListener messageListener) throws JMSException + public void setMessageListener(final MessageListener messageListener) throws JMSException { - checkPreConditions(); + checkPreConditions(); //if the current listener is non-null and the session is not stopped, then //it is an error to call this method. @@ -216,7 +221,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_session.isStopped()) { _messageListener.set(messageListener); - _logger.debug("Message listener set for destination " + _destination); + _logger.debug("Session stopped : Message listener set for destination " + _destination); } else { @@ -228,18 +233,35 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); } + _logger.debug("Message listener set for destination " + _destination); if (messageListener != null) { - //handle case where connection has already been started, and the dispatcher is blocked - //doing a put on the _synchronousQueue - AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); - if (jmsMsg != null) + //handle case where connection has already been started, and the dispatcher has alreaded started + // putting values on the _synchronousQueue + + synchronized (_session) { - preApplicationProcessing(jmsMsg); - messageListener.onMessage(jmsMsg); - postDeliver(jmsMsg); + //Pause Dispatcher + _session.doDispatcherTask(new DispatcherCallback(this) + { + public void whilePaused(Queue<MessageConsumerPair> reprocessQueue) + { + // Prepend messages in _synchronousQueue to dispatcher queue + _logger.debug("ReprocessQueue current size:" + reprocessQueue.size()); + for (Object item : _synchronousQueue) + { + reprocessQueue.offer(new MessageConsumerPair(_consumer, item)); + } + _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size()); + + // Set Message Listener + _logger.debug("Set Message Listener"); + _messageListener.set(messageListener); + } + } + ); } } } @@ -247,7 +269,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName()); @@ -314,13 +336,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - checkPreConditions(); + checkPreConditions(); acquireReceiving(); try { - if(closeOnAutoClose()) + if (closeOnAutoClose()) { return null; } @@ -355,7 +377,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean closeOnAutoClose() throws JMSException { - if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) + if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) { close(false); return true; @@ -368,13 +390,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - checkPreConditions(); + checkPreConditions(); acquireReceiving(); try { - if(closeOnAutoClose()) + if (closeOnAutoClose()) { return null; } @@ -430,19 +452,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { - if(sendClose) + if (sendClose) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - _consumerTag, // consumerTag - false); // nowait + (byte) 8, (byte) 0, // AMQP version (major, minor) + _consumerTag, // consumerTag + false); // nowait try { @@ -499,7 +521,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer messageFrame.contentHeader, messageFrame.bodies); - if(debug) + if (debug) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } @@ -507,6 +529,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer preDeliver(jmsMessage); + notifyMessage(jmsMessage, channelId); + } + catch (Exception e) + { + if (e instanceof InterruptedException) + { + _logger.info("SynchronousQueue.put interupted. Usually result of connection closing"); + } + else + { + _logger.error("Caught exception (dump follows) - ignoring...", e); + } + } + } + + /** + * @param jmsMessage this message has already been processed so can't redo preDeliver + * @param channelId + */ + public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) + { + try + { if (isMessageListenerSet()) { //we do not need a lock around the test above, and the dispatch below as it is invalid @@ -517,6 +562,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { + //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } @@ -524,11 +570,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (e instanceof InterruptedException) { - _logger.info("SynchronousQueue.put interupted. Usually result of connection closing"); + _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing"); } else { - _logger.error("Caught exception (dump follows) - ignoring...", e); + _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e); } } } @@ -550,7 +596,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void postDeliver(AbstractJMSMessage msg) throws JMSException { - msg.setJMSDestination(_destination); + msg.setJMSDestination(_destination); switch (_acknowledgeMode) { case Session.CLIENT_ACKNOWLEDGE: @@ -613,6 +659,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closed.set(true); + //QPID-293 can "request redelivery of this error through dispatcher" + // we have no way of propagating the exception to a message listener - a JMS limitation - so we // deal with the case where we have a synchronous receive() waiting for a message to arrive if (!isMessageListenerSet()) @@ -626,13 +674,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer deregisterConsumer(); } + /** * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean * case and in the case of an error occurring. */ private void deregisterConsumer() { - _session.deregisterConsumer(this); + _session.deregisterConsumer(this); } public AMQShortString getConsumerTag() @@ -645,26 +694,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _consumerTag = consumerTag; } - public AMQSession getSession() { - return _session; - } + public AMQSession getSession() + { + return _session; + } - private void checkPreConditions() throws JMSException{ + private void checkPreConditions() throws JMSException + { - this.checkNotClosed(); + this.checkNotClosed(); - if(_session == null || _session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - } + if (_session == null || _session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } public void acknowledge() throws JMSException { - if(!isClosed()) + if (!isClosed()) { Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); - while(tags.hasNext()) + while (tags.hasNext()) { _session.acknowledgeMessage(tags.next(), false); tags.remove(); @@ -699,10 +751,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closeWhenNoMessages = b; - if(_closeWhenNoMessages - && _synchronousQueue.isEmpty() - && _receiving.get() - && _messageListener != null) + if (_closeWhenNoMessages + && _synchronousQueue.isEmpty() + && _receiving.get() + && _messageListener != null) { _receivingThread.interrupt(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java new file mode 100644 index 0000000000..81a55006ed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import java.util.Queue; + +public abstract class DispatcherCallback +{ + BasicMessageConsumer _consumer; + + public DispatcherCallback(BasicMessageConsumer mc) + { + _consumer = mc; + } + + abstract public void whilePaused(Queue<MessageConsumerPair> reprocessQueue); + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java new file mode 100644 index 0000000000..585d6db3fd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +public class MessageConsumerPair +{ + BasicMessageConsumer _consumer; + Object _item; + + public MessageConsumerPair(BasicMessageConsumer consumer, Object item) + { + _consumer = consumer; + _item = item; + } + + public BasicMessageConsumer getConsumer() + { + return _consumer; + } + + public Object getItem() + { + return _item; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 32a25bd915..c6d6731967 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -86,7 +86,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } else { - _logger.warn("No Provider URL specified."); + _logger.info("No Provider URL specified."); } } catch (IOException ioe) diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java new file mode 100644 index 0000000000..58aaaf56b8 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class MessageListenerMultiConsumerTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(MessageListenerMultiConsumerTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int receivedCount1 = 0; + private int receivedCount2 = 0; + private Connection _clientConnection; + private MessageConsumer _consumer1; + private MessageConsumer _consumer2; + + private boolean _testAsync; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer1 = clientSession1.createConsumer(queue); + + //Create Client 2 + Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer2 = clientSession2.createConsumer(queue); + + //Create Producer + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + _testAsync = false; + } + + protected void tearDown() throws Exception + { + //Should have recieved all async messages + if (_testAsync) + { + assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); + } + _clientConnection.close(); + + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testRecieveC1thenC2() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + + assertTrue(_consumer1.receive() != null); + } + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(_consumer2.receive() != null); + } + } + + public void testRecieveInterleaved() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(_consumer1.receive() != null); + assertTrue(_consumer2.receive() != null); + } + } + + + public void testAsynchronousRecieve() throws Exception + { + _testAsync = true; + + _consumer1.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message); + + receivedCount1++; + } + }); + + _consumer2.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); + + receivedCount2++; + } + }); + + + _logger.info("Waiting 3 seconds for messages"); + + try + { + Thread.sleep(6000); + } + catch (InterruptedException e) + { + //do nothing + } + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java new file mode 100644 index 0000000000..b99593aaa5 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class MessageListenerTest extends TestCase implements MessageListener +{ + private static final Logger _logger = Logger.getLogger(MessageListenerTest.class); + + Context _context; + + private static final int MSG_COUNT = 5; + private int receivedCount = 0; + private MessageConsumer _consumer; + private Connection _clientConnection; + private boolean _testAsync; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + _consumer = clientSession.createConsumer(queue); + + //Create Producer + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + _testAsync = false; + } + + protected void tearDown() throws Exception + { + //Should have recieved all async messages + if (_testAsync) + { + assertEquals(MSG_COUNT, receivedCount); + } + _clientConnection.close(); + + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testSynchronousRecieve() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertTrue(_consumer.receive() != null); + } + } + + public void testAsynchronousRecieve() throws Exception + { + _testAsync = true; + + _consumer.setMessageListener(this); + + + _logger.info("Waiting 3 seconds for messages"); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + //do nothing + } + + } + + public void onMessage(Message message) + { + _logger.info("Received Message(" + receivedCount + "):" + message); + + receivedCount++; + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(MessageListenerTest.class); + } +} |