diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 138 |
1 files changed, 82 insertions, 56 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 2a37298a43..1b2d6876bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageListener; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * This is a 0.10 message consumer. */ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10> - implements org.apache.qpid.nclient.MessagePartListener { /** @@ -114,9 +114,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _consumerTagString; } - - // ----- Interface org.apache.qpid.client.util.MessageListener - /** * * This is invoked by the session thread when emptying the session message queue. @@ -152,35 +149,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (isMessageListenerSet() && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); } } - - - /** - * This method is invoked by the transport layer when a message is delivered for this - * consumer. The message is transformed and pass to the session. - * @param xfr an 0.10 message transfer - */ - public void messageTransfer(MessageTransfer xfr) - - //public void onMessage(Message message) - { - int channelId = getSession().getChannelId(); - int consumerTag = getConsumerTag(); - - UnprocessedMessage_0_10 newMessage = - new UnprocessedMessage_0_10(consumerTag, xfr); - - - getSession().messageReceived(newMessage); - // else ignore this message - } - //----- overwritten methods /** @@ -272,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if(! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } // now we need to acquire this message if needed @@ -284,9 +261,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _logger.debug("filterMessage - trying to acquire message"); } messageOk = acquireMessage(message); - _logger.debug("filterMessage - *************************************"); _logger.debug("filterMessage - message acquire status : " + messageOk); - _logger.debug("filterMessage - *************************************"); } return messageOk; } @@ -304,8 +279,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); - _0_10session.getQpidSession().messageAcknowledge(ranges, - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE ); + _0_10session.messageAcknowledge + (ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); _0_10session.getCurrentException(); } } @@ -360,7 +336,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (messageListener != null && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } if (messageListener != null && !_synchronousQueue.isEmpty()) { @@ -374,26 +351,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - public boolean isStrated() + public void failedOverPost() { - return _isStarted; - } - - public void start() - { - _isStarted = true; - if (_syncReceive.get()) + if (_0_10session.isStarted() && _syncReceive.get()) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } - public void stop() - { - _isStarted = false; - } - /** * When messages are not prefetched we need to request a message from the * broker. @@ -405,16 +372,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); - } if (! getSession().prefetch()) { _syncReceive.set(true); } + if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } Object o = super.getMessageFromQueue(l); + if (o == null && _0_10session.isStarted()) + { + _0_10session.getQpidSession().messageFlush + (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC); + _0_10session.getQpidSession().sync(); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.BYTE, + 0xFFFFFFFF, Option.UNRELIABLE); + if (getSession().prefetch()) + { + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, + _0_10session.getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + _0_10session.syncDispatchQueue(); + o = super.getMessageFromQueue(-1); + } if (! getSession().prefetch()) { _syncReceive.set(false); @@ -425,10 +411,50 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM void postDeliver(AbstractJMSMessage msg) throws JMSException { super.postDeliver(msg); - if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) + if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + + if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE && + !_session.isInRecovery() && + _session.getAMQConnection().getSyncAck()) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + ((AMQSession_0_10) getSession()).flushAcknowledgments(); + ((AMQSession_0_10) getSession()).getQpidSession().sync(); + } + } + + Message receiveBrowse() throws JMSException + { + return receiveNoWait(); } + @Override public void rollbackPendingMessages() + { + if (_synchronousQueue.size() > 0) + { + RangeSet ranges = new RangeSet(); + Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) + { + + Object o = iterator.next(); + if (o instanceof AbstractJMSMessage) + { + ranges.add((int) ((AbstractJMSMessage) o).getDeliveryTag()); + iterator.remove(); + } + else + { + _logger.error("Queue contained a :" + o.getClass() + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + iterator.remove(); + } + } + + _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + clearReceiveQueue(); + } + } } |