diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 125 |
1 files changed, 70 insertions, 55 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7c8ccf4cf9..71780f5714 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -26,17 +26,14 @@ import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.filter.JMSSelectorFilter; import org.apache.qpid.jms.Session; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; + import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,11 +49,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM protected final Logger _logger = LoggerFactory.getLogger(getClass()); /** - * The message selector filter associated with this consumer message selector - */ - private MessageFilter _filter = null; - - /** * The underlying QpidSession */ private AMQSession_0_10 _0_10session; @@ -64,7 +56,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM /** * Indicates whether this consumer receives pre-acquired messages */ - private boolean _preAcquire = true; + private final boolean _preAcquire; /** * Specify whether this consumer is performing a sync receive @@ -72,44 +64,22 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private final AtomicBoolean _syncReceive = new AtomicBoolean(false); private String _consumerTagString; - private long capacity = 0; + private final long _capacity; protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession session, AMQProtocolHandler protocolHandler, + AMQSession<?,?> session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); + arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; - if (messageSelector != null && !messageSelector.equals("")) - { - try - { - _filter = new JMSSelectorFilter(messageSelector); - } - catch (AMQInternalException e) - { - throw new InvalidSelectorException("cannot create consumer because of selector issue"); - } - if (destination instanceof AMQQueue) - { - _preAcquire = false; - } - } - - // Destination setting overrides connection defaults - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (getSession().prefetch()) - { - capacity = _0_10session.getAMQConnection().getMaxPrefetch(); - } + + _preAcquire = evaluatePreAcquire(browseOnly, destination); + + _capacity = evaluateCapacity(destination); if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { @@ -123,7 +93,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - @Override public void setConsumerTag(int consumerTag) { super.setConsumerTag(consumerTag); @@ -149,7 +118,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (checkPreConditions(jmsMessage)) { - if (isMessageListenerSet() && capacity == 0) + if (isMessageListenerSet() && _capacity == 0) { messageFlow(); } @@ -160,7 +129,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { // if we are synchronously waiting for a message // and messages are not pre-fetched we then need to request another one - if(capacity == 0) + if(_capacity == 0) { messageFlow(); } @@ -238,9 +207,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { - if (_messageSelector != null && !_messageSelector.equals("")) + if (_messageSelectorFilter != null) { - messageOk = _filter.matches(message); + messageOk = _messageSelectorFilter.matches(message); } } catch (Exception e) @@ -275,11 +244,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM flushUnwantedMessage(message); } } - - // now we need to acquire this message if needed - // this is the case of queue with a message selector set - if (!_preAcquire && messageOk && !isNoConsume()) + else if (!_preAcquire && !isBrowseOnly()) { + // now we need to acquire this message if needed + // this is the case of queue with a message selector set if (_logger.isDebugEnabled()) { _logger.debug("filterMessage - trying to acquire message"); @@ -368,7 +336,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM super.setMessageListener(messageListener); try { - if (messageListener != null && capacity == 0) + if (messageListener != null && _capacity == 0) { messageFlow(); } @@ -408,11 +376,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (capacity == 0) + if (_capacity == 0) { _syncReceive.set(true); } - if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) + if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty()) { messageFlow(); } @@ -427,18 +395,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM (getConsumerTagString(), MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if (capacity > 0) + if (_capacity > 0) { _0_10session.getQpidSession().messageFlow (getConsumerTagString(), MessageCreditUnit.MESSAGE, - capacity, + _capacity, Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (capacity == 0) + if (_capacity == 0) { _syncReceive.set(false); } @@ -536,4 +504,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } } + + long getCapacity() + { + return _capacity; + } + + boolean isPreAcquire() + { + return _preAcquire; + } + + private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination) + { + boolean preAcquire; + if (browseOnly) + { + preAcquire = false; + } + else + { + boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE); + if (isQueue && getMessageSelectorFilter() != null) + { + preAcquire = false; + } + else + { + preAcquire = true; + } + } + return preAcquire; + } + + private long evaluateCapacity(AMQDestination destination) + { + long capacity = 0; + if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0) + { + capacity = destination.getLink().getConsumerCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + return capacity; + } + } |