diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 182 |
1 files changed, 83 insertions, 99 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 548e274571..b5f3501e5a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,11 +19,10 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; @@ -66,13 +65,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private boolean _preAcquire = true; /** + * Indicate whether this consumer is started. + */ + private boolean _isStarted = false; + + /** * Specify whether this consumer is performing a sync receive */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); private String _consumerTagString; private long capacity = 0; - + + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, @@ -98,6 +103,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _preAcquire = false; } } + _isStarted = connection.started(); // Destination setting overrides connection defaults if (destination.getDestSyntax() == DestSyntax.ADDR && @@ -150,20 +156,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (isMessageListenerSet() && capacity == 0) { - messageFlow(); + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); } - else - { - // if we are synchronously waiting for a message - // and messages are not pre-fetched we then need to request another one - if(capacity == 0) - { - messageFlow(); - } - } } catch (AMQException e) { @@ -172,6 +171,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } + //----- overwritten methods + /** * This method is invoked when this consumer is stopped. * It tells the broker to stop delivering messages to this consumer. @@ -201,18 +202,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM super.notifyMessage(messageFrame); } - @Override - protected void preDeliver(AbstractJMSMessage jmsMsg) + @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - super.preDeliver(jmsMsg); - - if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) + super.preApplicationProcessing(jmsMsg); + if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE) { - //For 0-10 we need to ensure that all messages are indicated processed in some way to - //ensure their AMQP command-id is marked completed, and so we must send a completion - //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring. - //Add message to the unacked message list to ensure we dont lose record of it before - //sending a completion of some sort. _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } } @@ -224,6 +218,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _messageFactory.createMessage(msg.getMessageTransfer()); } + // private methods /** * Check whether a message can be delivered to this consumer. * @@ -252,7 +247,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _logger.debug("messageOk " + messageOk); _logger.debug("_preAcquire " + _preAcquire); } - if (!messageOk) { if (_preAcquire) @@ -269,12 +263,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_logger.isDebugEnabled()) { - _logger.debug("filterMessage - not ack'ing message as not acquired"); + _logger.debug("Message not OK, releasing"); } - flushUnwantedMessage(message); + releaseMessage(message); + } + // if we are syncrhonously waiting for a message + // and messages are not prefetched we then need to request another one + if(capacity == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } - // now we need to acquire this message if needed // this is the case of queue with a message selector set if (!_preAcquire && messageOk && !isNoConsume()) @@ -286,7 +287,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM messageOk = acquireMessage(message); _logger.debug("filterMessage - message acquire status : " + messageOk); } - return messageOk; } @@ -297,38 +297,38 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM * @param message The message to be acknowledged * @throws AMQException If the message cannot be acquired due to some internal error. */ - private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException + private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException { - final RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - _0_10session.messageAcknowledge - (ranges, - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - - final AMQException amqe = _0_10session.getCurrentException(); - if (amqe != null) + if (!_preAcquire) { - throw amqe; + RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.messageAcknowledge + (ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } } /** - * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated - * processed to ensure their AMQP command-id is marked completed. + * Release a message * - * @param message The unwanted message to be flushed - * @throws AMQException If the unwanted message cannot be flushed due to some internal error. + * @param message The message to be released + * @throws AMQException If the message cannot be released due to some internal error. */ - private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException + private void releaseMessage(AbstractJMSMessage message) throws AMQException { - final RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - _0_10session.flushProcessed(ranges,false); - - final AMQException amqe = _0_10session.getCurrentException(); - if (amqe != null) + if (_preAcquire) { - throw amqe; + RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.getQpidSession().messageRelease(ranges); + _0_10session.sync(); } } @@ -339,52 +339,44 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM * @return true if the message has been acquired, false otherwise. * @throws AMQException If the message cannot be acquired due to some internal error. */ - private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException + private boolean acquireMessage(AbstractJMSMessage message) throws AMQException { boolean result = false; - final RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); + if (!_preAcquire) + { + RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); - final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); + Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); - final RangeSet acquired = acq.getTransfers(); - if (acquired != null && acquired.size() > 0) - { - result = true; + RangeSet acquired = acq.getTransfers(); + if (acquired != null && acquired.size() > 0) + { + result = true; + } } return result; } - private void messageFlow() - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); - } public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - try + if (messageListener != null && capacity == 0) { - if (messageListener != null && capacity == 0) - { - messageFlow(); - } - if (messageListener != null && !_synchronousQueue.isEmpty()) - { - Iterator messages=_synchronousQueue.iterator(); - while (messages.hasNext()) - { - AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); - messages.remove(); - _session.rejectMessage(message, true); - } - } + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } - catch(TransportException e) + if (messageListener != null && !_synchronousQueue.isEmpty()) { - throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e); + Iterator messages=_synchronousQueue.iterator(); + while (messages.hasNext()) + { + AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); + messages.remove(); + _session.rejectMessage(message, true); + } } } @@ -392,7 +384,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_0_10session.isStarted() && _syncReceive.get()) { - messageFlow(); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } @@ -413,7 +407,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) { - messageFlow(); + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } Object o = super.getMessageFromQueue(l); if (o == null && _0_10session.isStarted()) @@ -444,7 +440,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return o; } - void postDeliver(AbstractJMSMessage msg) + void postDeliver(AbstractJMSMessage msg) throws JMSException { super.postDeliver(msg); if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) @@ -453,8 +449,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE && - !_session.isInRecovery() && _session.getAMQConnection().getSyncAck()) + !_session.isInRecovery() && + _session.getAMQConnection().getSyncAck()) { + ((AMQSession_0_10) getSession()).flushAcknowledgments(); ((AMQSession_0_10) getSession()).getQpidSession().sync(); } } @@ -511,18 +509,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _exclusive; } } - - void cleanupQueue() throws AMQException, FailoverException - { - AMQDestination dest = this.getDestination(); - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.RECEIVER ) - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); - } - } - } } |