diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 529 |
1 files changed, 529 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java new file mode 100644 index 0000000000..964c238946 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -0,0 +1,529 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.DestSyntax; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.message.*; +import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.*; +import org.apache.qpid.filter.MessageFilter; +import org.apache.qpid.filter.JMSSelectorFilter; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This is a 0.10 message consumer. + */ +public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10> +{ + + /** + * This class logger + */ + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + + /** + * The message selector filter associated with this consumer message selector + */ + private MessageFilter _filter = null; + + /** + * The underlying QpidSession + */ + private AMQSession_0_10 _0_10session; + + /** + * Indicates whether this consumer receives pre-acquired messages + */ + private boolean _preAcquire = true; + + /** + * Indicate whether this consumer is started. + */ + private boolean _isStarted = false; + + /** + * Specify whether this consumer is performing a sync receive + */ + private final AtomicBoolean _syncReceive = new AtomicBoolean(false); + private String _consumerTagString; + + private long capacity = 0; + + //--- constructor + protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, + AMQSession session, AMQProtocolHandler protocolHandler, + FieldTable arguments, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + throws JMSException + { + super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, + arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); + _0_10session = (AMQSession_0_10) session; + if (messageSelector != null && !messageSelector.equals("")) + { + try + { + _filter = new JMSSelectorFilter(messageSelector); + } + catch (AMQInternalException e) + { + throw new InvalidSelectorException("cannot create consumer because of selector issue"); + } + if (destination instanceof AMQQueue) + { + _preAcquire = false; + } + } + _isStarted = connection.started(); + + // Destination setting overrides connection defaults + if (destination.getDestSyntax() == DestSyntax.ADDR && + destination.getLink().getConsumerCapacity() > 0) + { + capacity = destination.getLink().getConsumerCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) + { + boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; + + if (!namedQueue) + { + _destination = destination.copyDestination(); + _destination.setQueueName(null); + } + } + } + + + @Override public void setConsumerTag(int consumerTag) + { + super.setConsumerTag(consumerTag); + _consumerTagString = String.valueOf(consumerTag); + } + + public String getConsumerTagString() + { + return _consumerTagString; + } + + /** + * + * This is invoked by the session thread when emptying the session message queue. + * We first check if the message is valid (match the selector) and then deliver it to the + * message listener or to the sync consumer queue. + * + * @param jmsMessage this message has already been processed so can't redo preDeliver + */ + @Override public void notifyMessage(AbstractJMSMessage jmsMessage) + { + try + { + if (checkPreConditions(jmsMessage)) + { + if (isMessageListenerSet() && capacity == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + _logger.debug("messageOk, trying to notify"); + super.notifyMessage(jmsMessage); + } + } + catch (AMQException e) + { + _logger.error("Receivecd an Exception when receiving message",e); + getSession().getAMQConnection().exceptionReceived(e); + } + } + + //----- overwritten methods + + /** + * This method is invoked when this consumer is stopped. + * It tells the broker to stop delivering messages to this consumer. + */ + @Override void sendCancel() throws AMQException + { + _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + try + { + _0_10session.getQpidSession().sync(); + getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel + } + catch (SessionException se) + { + _0_10session.setCurrentException(se); + } + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + + @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame) + { + super.notifyMessage(messageFrame); + } + + @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + { + super.preApplicationProcessing(jmsMsg); + if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE) + { + _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); + } + } + + @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( + AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception + { + AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession()); + return _messageFactory.createMessage(msg.getMessageTransfer()); + } + + // private methods + /** + * Check whether a message can be delivered to this consumer. + * + * @param message The message to be checked. + * @return true if the message matches the selector and can be acquired, false otherwise. + * @throws AMQException If the message preConditions cannot be checked due to some internal error. + */ + private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException + { + boolean messageOk = true; + // TODO Use a tag for fiding out if message filtering is done here or by the broker. + try + { + if (_messageSelector != null && !_messageSelector.equals("")) + { + messageOk = _filter.matches(message); + } + } + catch (Exception e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("messageOk " + messageOk); + _logger.debug("_preAcquire " + _preAcquire); + } + if (!messageOk) + { + if (_preAcquire) + { + // this is the case for topics + // We need to ack this message + if (_logger.isDebugEnabled()) + { + _logger.debug("filterMessage - trying to ack message"); + } + acknowledgeMessage(message); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Message not OK, releasing"); + } + releaseMessage(message); + } + // if we are syncrhonously waiting for a message + // and messages are not prefetched we then need to request another one + if(capacity == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + } + // now we need to acquire this message if needed + // this is the case of queue with a message selector set + if (!_preAcquire && messageOk && !isNoConsume()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("filterMessage - trying to acquire message"); + } + messageOk = acquireMessage(message); + _logger.debug("filterMessage - message acquire status : " + messageOk); + } + return messageOk; + } + + + /** + * Acknowledge a message + * + * @param message The message to be acknowledged + * @throws AMQException If the message cannot be acquired due to some internal error. + */ + private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException + { + if (!_preAcquire) + { + RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.messageAcknowledge + (ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + } + + /** + * Release a message + * + * @param message The message to be released + * @throws AMQException If the message cannot be released due to some internal error. + */ + private void releaseMessage(AbstractJMSMessage message) throws AMQException + { + if (_preAcquire) + { + RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.getQpidSession().messageRelease(ranges); + _0_10session.sync(); + } + } + + /** + * Acquire a message + * + * @param message The message to be acquired + * @return true if the message has been acquired, false otherwise. + * @throws AMQException If the message cannot be acquired due to some internal error. + */ + private boolean acquireMessage(AbstractJMSMessage message) throws AMQException + { + boolean result = false; + if (!_preAcquire) + { + RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + + Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); + + RangeSet acquired = acq.getTransfers(); + if (acquired != null && acquired.size() > 0) + { + result = true; + } + } + return result; + } + + + public void setMessageListener(final MessageListener messageListener) throws JMSException + { + super.setMessageListener(messageListener); + if (messageListener != null && capacity == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + if (messageListener != null && !_synchronousQueue.isEmpty()) + { + Iterator messages=_synchronousQueue.iterator(); + while (messages.hasNext()) + { + AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); + messages.remove(); + _session.rejectMessage(message, true); + } + } + } + + public void failedOverPost() + { + if (_0_10session.isStarted() && _syncReceive.get()) + { + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + } + + /** + * When messages are not prefetched we need to request a message from the + * broker. + * Note that if the timeout is too short a message may be queued in _synchronousQueue until + * this consumer closes or request it. + * @param l + * @return + * @throws InterruptedException + */ + public Object getMessageFromQueue(long l) throws InterruptedException + { + if (capacity == 0) + { + _syncReceive.set(true); + } + if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + Object o = super.getMessageFromQueue(l); + if (o == null && _0_10session.isStarted()) + { + + _0_10session.getQpidSession().messageFlush + (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC); + _0_10session.getQpidSession().sync(); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.BYTE, + 0xFFFFFFFF, Option.UNRELIABLE); + + if (capacity > 0) + { + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), + MessageCreditUnit.MESSAGE, + capacity, + Option.UNRELIABLE); + } + _0_10session.syncDispatchQueue(); + o = super.getMessageFromQueue(-1); + } + if (capacity == 0) + { + _syncReceive.set(false); + } + return o; + } + + void postDeliver(AbstractJMSMessage msg) throws JMSException + { + super.postDeliver(msg); + if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + + if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE && + !_session.isInRecovery() && + _session.getAMQConnection().getSyncAck()) + { + ((AMQSession_0_10) getSession()).flushAcknowledgments(); + ((AMQSession_0_10) getSession()).getQpidSession().sync(); + } + } + + Message receiveBrowse() throws JMSException + { + return receiveNoWait(); + } + + @Override public void rollbackPendingMessages() + { + if (_synchronousQueue.size() > 0) + { + RangeSet ranges = new RangeSet(); + Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) + { + + Object o = iterator.next(); + if (o instanceof AbstractJMSMessage) + { + ranges.add((int) ((AbstractJMSMessage) o).getDeliveryTag()); + iterator.remove(); + } + else + { + _logger.error("Queue contained a :" + o.getClass() + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + iterator.remove(); + } + } + + _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + clearReceiveQueue(); + } + } + + public boolean isExclusive() + { + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } + } + + void cleanupQueue() throws AMQException, FailoverException + { + AMQDestination dest = this.getDestination(); + if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.RECEIVER ) + { + ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( + this.getDestination().getQueueName()); + } + } + } +} |