diff options
Diffstat (limited to 'java/client/src')
7 files changed, 784 insertions, 146 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 7f17dad8e6..bf812ee302 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -88,6 +89,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; + private final java.util.Queue<MessageConsumerPair> _reprocessQueue; + private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -135,11 +138,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private volatile AtomicBoolean _stopped = new AtomicBoolean(true); /** + * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer + */ + private final AtomicBoolean _pausing = new AtomicBoolean(false); + + /** + * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer + */ + private final AtomicBoolean _paused = new AtomicBoolean(false); + + /** * Set when recover is called. This is to handle the case where recover() is called by application code * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. */ private boolean _inRecovery; + public void doDispatcherTask(DispatcherCallback dispatcherCallback) + { + synchronized (this) + { + _dispatcher.pause(); + + dispatcherCallback.whilePaused(_reprocessQueue); + + _dispatcher.reprocess(); + } + } /** @@ -147,6 +171,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private class Dispatcher extends Thread { + private final Logger _logger = Logger.getLogger(Dispatcher.class); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -154,23 +180,105 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { - UnprocessedMessage message; _stopped.set(false); + + while (!_stopped.get()) + { + if (_pausing.get()) + { + try + { + //Wait for unpausing + synchronized (_pausing) + { + synchronized (_paused) + { + _paused.notify(); + } + + _logger.info("dispatcher paused"); + + _pausing.wait(); + _logger.info("dispatcher notified"); + } + + } + catch (InterruptedException e) + { + //do nothing... occurs when a pause request occurs will already + // be here if another pause event is pending + _logger.info("dispacher interrupted"); + } + + doReDispatch(); + + } + else + { + doNormalDispatch(); + } + } + + _logger.info("Dispatcher thread terminating for channel " + _channelId); + } + + private void doNormalDispatch() + { + UnprocessedMessage message; try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null) { dispatchMessage(message); } } catch (InterruptedException e) { - ; + _logger.info("dispatcher normal dispatch interrupted"); } - _logger.info("Dispatcher thread terminating for channel " + _channelId); } + private void doReDispatch() + { + _logger.info("doRedispatching"); + + MessageConsumerPair messageConsumerPair; + + if (_reprocessQueue != null) + { + _logger.info("Reprocess Queue has size:" + _reprocessQueue.size()); + while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null)) + { + reDispatchMessage(messageConsumerPair); + } + } + + if (_reprocessQueue == null || _reprocessQueue.isEmpty()) + { + _logger.info("Reprocess Queue emptied"); + _pausing.set(false); + } + else + { + _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size()); + } + + } + + private void reDispatchMessage(MessageConsumerPair consumerPair) + { + if (consumerPair.getItem() instanceof AbstractJMSMessage) + { + _logger.info("do renotify:" + consumerPair.getItem()); + consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId); + } + + // BasicMessageConsumer.notifyError(Throwable cause) + // will put the cause in to the list which could come out here... need to watch this. + } + + private void dispatchMessage(UnprocessedMessage message) { if (message.deliverBody != null) @@ -231,6 +339,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _stopped.set(true); interrupt(); } + + public void pause() + { + _logger.info("pausing"); + _pausing.set(true); + + + interrupt(); + + synchronized (_paused) + { + try + { + _paused.wait(); + } + catch (InterruptedException e) + { + //do nothing + } + } + } + + public void reprocess() + { + synchronized (_pausing) + { + _logger.info("reprocessing"); + _pausing.notify(); + } + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -263,6 +401,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; + _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); + if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -315,7 +455,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return new JMSBytesMessage(); @@ -324,7 +464,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return new JMSMapMessage(); @@ -338,7 +478,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return (ObjectMessage) new JMSObjectMessage(); @@ -354,7 +494,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -364,7 +504,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -409,7 +549,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class); + _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxCommitOkBody.class); } catch (AMQException e) { @@ -428,7 +568,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class); + TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); } catch (AMQException e) { @@ -440,7 +580,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -451,15 +591,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { _connection.getProtocolHandler().closeSession(this); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + (byte) 8, (byte) 0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -512,7 +652,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -653,8 +793,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false)); // requeue + (byte) 8, (byte) 0, // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -825,8 +965,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createBrowserConsumer(Destination destination, - String messageSelector, - boolean noLocal) + String messageSelector, + boolean noLocal) throws JMSException { checkValidDestination(destination); @@ -935,11 +1075,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQException e) { JMSException ex = new JMSException("Error registering consumer: " + e); + + //todo remove + e.printStackTrace(); ex.setLinkedException(e); throw ex; } - synchronized(destination) + synchronized (destination) { _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); _destinationConsumerCount.get(destination).incrementAndGet(); @@ -990,16 +1133,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - 0, // ticket - type); // type + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + false, // nowait + false, // passive + 0, // ticket + type); // type _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } @@ -1014,16 +1157,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - true, // nowait - false, // passive - 0, // ticket - type); // type + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + type); // type protocolHandler.writeFrame(exchangeDeclare); } @@ -1049,15 +1192,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - true, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + true, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + 0); // ticket protocolHandler.writeFrame(queueDeclare); return amqd.getAMQQueueName(); @@ -1069,13 +1212,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - true, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + ft, // arguments + amqd.getExchangeName(), // exchange + true, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + 0); // ticket protocolHandler.writeFrame(queueBind); } @@ -1098,11 +1241,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); } - if(consumer.isAutoClose()) + if (consumer.isAutoClose()) { arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); } - if(consumer.isNoConsume()) + if (consumer.isNoConsume()) { arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } @@ -1117,15 +1260,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + 0); // ticket if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1282,9 +1425,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { AMQShortString topicName; - if(topic instanceof AMQTopic) + if (topic instanceof AMQTopic) { - topicName = ((AMQTopic)topic).getDestinationName(); + topicName = ((AMQTopic) topic).getDestinationName(); } else { @@ -1315,12 +1458,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + 0); // ticket _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } catch (AMQException e) @@ -1360,7 +1503,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidQueue(queue); - return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector); + return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1410,10 +1553,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange - queueName, // queue - routingKey); // routingKey + (byte) 8, (byte) 0, // AMQP version (major, minor) + ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + queueName, // queue + routingKey); // routingKey AMQMethodEvent response = null; try { @@ -1474,9 +1617,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple + (byte) 8, (byte) 0, // AMQP version (major, minor) + deliveryTag, // deliveryTag + multiple); // multiple if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); @@ -1521,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //stop the server delivering messages to this session suspendChannel(); -//stop the dispatcher thread + //stop the dispatcher thread _stopped.set(true); } @@ -1574,7 +1717,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } Destination dest = consumer.getDestination(); - synchronized(dest) + synchronized (dest) { if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) { @@ -1639,8 +1782,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false); // active + (byte) 8, (byte) 0, // AMQP version (major, minor) + false); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } @@ -1651,15 +1794,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - true); // active + (byte) 8, (byte) 0, // AMQP version (major, minor) + true); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } public void confirmConsumerCancelled(AMQShortString consumerTag) { BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if((consumer != null) && (consumer.isAutoClose())) + if ((consumer != null) && (consumer.isAutoClose())) { consumer.closeWhenNoMessages(true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c5e97a27f6..b3ae54f982 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -34,6 +34,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Iterator; +import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; @@ -63,7 +64,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Holds an atomic reference to the listener installed. */ - private final AtomicReference _messageListener = new AtomicReference(); + private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); /** * The consumer tag allows us to close the consumer by sending a jmsCancel method to the @@ -78,13 +79,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Used in the blocking receive methods to receive a message from - * the Session thread. Argument true indicates we want strict FIFO semantics + * the Session thread. + * <p/> + * Or to notify of errors + * <p/> + * Argument true indicates we want strict FIFO semantics */ private final ArrayBlockingQueue _synchronousQueue; private MessageFactoryRegistry _messageFactory; - private AMQSession _session; + private final AMQSession _session; private AMQProtocolHandler _protocolHandler; @@ -141,8 +146,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private Thread _receivingThread; /** - * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive - * on the queue. This is used for queue browsing. + * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive + * on the queue. This is used for queue browsing. */ private boolean _autoClose; private boolean _closeWhenNoMessages; @@ -179,14 +184,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { - checkPreConditions(); + checkPreConditions(); return _messageSelector; } public MessageListener getMessageListener() throws JMSException { - checkPreConditions(); - return (MessageListener) _messageListener.get(); + checkPreConditions(); + return _messageListener.get(); } public int getAcknowledgeMode() @@ -199,9 +204,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _messageListener.get() != null; } - public void setMessageListener(MessageListener messageListener) throws JMSException + public void setMessageListener(final MessageListener messageListener) throws JMSException { - checkPreConditions(); + checkPreConditions(); //if the current listener is non-null and the session is not stopped, then //it is an error to call this method. @@ -216,7 +221,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_session.isStopped()) { _messageListener.set(messageListener); - _logger.debug("Message listener set for destination " + _destination); + _logger.debug("Session stopped : Message listener set for destination " + _destination); } else { @@ -228,18 +233,35 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); } + _logger.debug("Message listener set for destination " + _destination); if (messageListener != null) { - //handle case where connection has already been started, and the dispatcher is blocked - //doing a put on the _synchronousQueue - AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); - if (jmsMsg != null) + //handle case where connection has already been started, and the dispatcher has alreaded started + // putting values on the _synchronousQueue + + synchronized (_session) { - preApplicationProcessing(jmsMsg); - messageListener.onMessage(jmsMsg); - postDeliver(jmsMsg); + //Pause Dispatcher + _session.doDispatcherTask(new DispatcherCallback(this) + { + public void whilePaused(Queue<MessageConsumerPair> reprocessQueue) + { + // Prepend messages in _synchronousQueue to dispatcher queue + _logger.debug("ReprocessQueue current size:" + reprocessQueue.size()); + for (Object item : _synchronousQueue) + { + reprocessQueue.offer(new MessageConsumerPair(_consumer, item)); + } + _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size()); + + // Set Message Listener + _logger.debug("Set Message Listener"); + _messageListener.set(messageListener); + } + } + ); } } } @@ -247,7 +269,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName()); @@ -314,13 +336,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - checkPreConditions(); + checkPreConditions(); acquireReceiving(); try { - if(closeOnAutoClose()) + if (closeOnAutoClose()) { return null; } @@ -355,7 +377,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean closeOnAutoClose() throws JMSException { - if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) + if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) { close(false); return true; @@ -368,13 +390,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - checkPreConditions(); + checkPreConditions(); acquireReceiving(); try { - if(closeOnAutoClose()) + if (closeOnAutoClose()) { return null; } @@ -430,19 +452,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { - if(sendClose) + if (sendClose) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - _consumerTag, // consumerTag - false); // nowait + (byte) 8, (byte) 0, // AMQP version (major, minor) + _consumerTag, // consumerTag + false); // nowait try { @@ -499,7 +521,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer messageFrame.contentHeader, messageFrame.bodies); - if(debug) + if (debug) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } @@ -507,6 +529,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer preDeliver(jmsMessage); + notifyMessage(jmsMessage, channelId); + } + catch (Exception e) + { + if (e instanceof InterruptedException) + { + _logger.info("SynchronousQueue.put interupted. Usually result of connection closing"); + } + else + { + _logger.error("Caught exception (dump follows) - ignoring...", e); + } + } + } + + /** + * @param jmsMessage this message has already been processed so can't redo preDeliver + * @param channelId + */ + public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) + { + try + { if (isMessageListenerSet()) { //we do not need a lock around the test above, and the dispatch below as it is invalid @@ -517,6 +562,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { + //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } @@ -524,11 +570,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (e instanceof InterruptedException) { - _logger.info("SynchronousQueue.put interupted. Usually result of connection closing"); + _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing"); } else { - _logger.error("Caught exception (dump follows) - ignoring...", e); + _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e); } } } @@ -550,7 +596,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void postDeliver(AbstractJMSMessage msg) throws JMSException { - msg.setJMSDestination(_destination); + msg.setJMSDestination(_destination); switch (_acknowledgeMode) { case Session.CLIENT_ACKNOWLEDGE: @@ -613,6 +659,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closed.set(true); + //QPID-293 can "request redelivery of this error through dispatcher" + // we have no way of propagating the exception to a message listener - a JMS limitation - so we // deal with the case where we have a synchronous receive() waiting for a message to arrive if (!isMessageListenerSet()) @@ -626,13 +674,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer deregisterConsumer(); } + /** * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean * case and in the case of an error occurring. */ private void deregisterConsumer() { - _session.deregisterConsumer(this); + _session.deregisterConsumer(this); } public AMQShortString getConsumerTag() @@ -645,26 +694,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _consumerTag = consumerTag; } - public AMQSession getSession() { - return _session; - } + public AMQSession getSession() + { + return _session; + } - private void checkPreConditions() throws JMSException{ + private void checkPreConditions() throws JMSException + { - this.checkNotClosed(); + this.checkNotClosed(); - if(_session == null || _session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - } + if (_session == null || _session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } public void acknowledge() throws JMSException { - if(!isClosed()) + if (!isClosed()) { Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); - while(tags.hasNext()) + while (tags.hasNext()) { _session.acknowledgeMessage(tags.next(), false); tags.remove(); @@ -699,10 +751,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closeWhenNoMessages = b; - if(_closeWhenNoMessages - && _synchronousQueue.isEmpty() - && _receiving.get() - && _messageListener != null) + if (_closeWhenNoMessages + && _synchronousQueue.isEmpty() + && _receiving.get() + && _messageListener != null) { _receivingThread.interrupt(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java new file mode 100644 index 0000000000..81a55006ed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import java.util.Queue; + +public abstract class DispatcherCallback +{ + BasicMessageConsumer _consumer; + + public DispatcherCallback(BasicMessageConsumer mc) + { + _consumer = mc; + } + + abstract public void whilePaused(Queue<MessageConsumerPair> reprocessQueue); + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java new file mode 100644 index 0000000000..585d6db3fd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +public class MessageConsumerPair +{ + BasicMessageConsumer _consumer; + Object _item; + + public MessageConsumerPair(BasicMessageConsumer consumer, Object item) + { + _consumer = consumer; + _item = item; + } + + public BasicMessageConsumer getConsumer() + { + return _consumer; + } + + public Object getItem() + { + return _item; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 32a25bd915..c6d6731967 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -86,7 +86,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } else { - _logger.warn("No Provider URL specified."); + _logger.info("No Provider URL specified."); } } catch (IOException ioe) diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java new file mode 100644 index 0000000000..58aaaf56b8 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class MessageListenerMultiConsumerTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(MessageListenerMultiConsumerTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int receivedCount1 = 0; + private int receivedCount2 = 0; + private Connection _clientConnection; + private MessageConsumer _consumer1; + private MessageConsumer _consumer2; + + private boolean _testAsync; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer1 = clientSession1.createConsumer(queue); + + //Create Client 2 + Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer2 = clientSession2.createConsumer(queue); + + //Create Producer + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + _testAsync = false; + } + + protected void tearDown() throws Exception + { + //Should have recieved all async messages + if (_testAsync) + { + assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); + } + _clientConnection.close(); + + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testRecieveC1thenC2() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + + assertTrue(_consumer1.receive() != null); + } + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(_consumer2.receive() != null); + } + } + + public void testRecieveInterleaved() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(_consumer1.receive() != null); + assertTrue(_consumer2.receive() != null); + } + } + + + public void testAsynchronousRecieve() throws Exception + { + _testAsync = true; + + _consumer1.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message); + + receivedCount1++; + } + }); + + _consumer2.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); + + receivedCount2++; + } + }); + + + _logger.info("Waiting 3 seconds for messages"); + + try + { + Thread.sleep(6000); + } + catch (InterruptedException e) + { + //do nothing + } + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java new file mode 100644 index 0000000000..b99593aaa5 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class MessageListenerTest extends TestCase implements MessageListener +{ + private static final Logger _logger = Logger.getLogger(MessageListenerTest.class); + + Context _context; + + private static final int MSG_COUNT = 5; + private int receivedCount = 0; + private MessageConsumer _consumer; + private Connection _clientConnection; + private boolean _testAsync; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + _consumer = clientSession.createConsumer(queue); + + //Create Producer + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + _testAsync = false; + } + + protected void tearDown() throws Exception + { + //Should have recieved all async messages + if (_testAsync) + { + assertEquals(MSG_COUNT, receivedCount); + } + _clientConnection.close(); + + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testSynchronousRecieve() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertTrue(_consumer.receive() != null); + } + } + + public void testAsynchronousRecieve() throws Exception + { + _testAsync = true; + + _consumer.setMessageListener(this); + + + _logger.info("Waiting 3 seconds for messages"); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + //do nothing + } + + } + + public void onMessage(Message message) + { + _logger.info("Received Message(" + receivedCount + "):" + message); + + receivedCount++; + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(MessageListenerTest.class); + } +} |