diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 74 |
1 files changed, 42 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 3b6179dd07..26bb51b821 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,21 +19,32 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AMQMessageDelegate_0_10; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Acquired; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.TransportException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,7 +57,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM /** * This class logger */ - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); /** * The underlying QpidSession @@ -67,7 +78,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private final long _capacity; /** Flag indicating if the server supports message selectors */ - protected final boolean _serverJmsSelectorSupport; + private final boolean _serverJmsSelectorSupport; protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -80,11 +91,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; - _preAcquire = evaluatePreAcquire(browseOnly, destination); - - _capacity = evaluateCapacity(destination); _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + _preAcquire = evaluatePreAcquire(browseOnly, destination, _serverJmsSelectorSupport); + _capacity = evaluateCapacity(destination); if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { @@ -92,8 +102,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (!namedQueue) { - _destination = destination.copyDestination(); - _destination.setQueueName(null); + setDestination(destination.copyDestination()); + getDestination().setQueueName(null); } } } @@ -181,14 +191,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { super.preDeliver(jmsMsg); - if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) + if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) { //For 0-10 we need to ensure that all messages are indicated processed in some way to //ensure their AMQP command-id is marked completed, and so we must send a completion //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring. //Add message to the unacked message list to ensure we dont lose record of it before //sending a completion of some sort. - _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); + getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } } @@ -196,7 +206,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception { AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession()); - return _messageFactory.createMessage(msg.getMessageTransfer()); + return getMessageFactory().createMessage(msg.getMessageTransfer()); } /** @@ -211,9 +221,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM boolean messageOk = true; try { - if (_messageSelectorFilter != null && !_serverJmsSelectorSupport) + if (!_serverJmsSelectorSupport && getMessageSelectorFilter() != null) { - messageOk = _messageSelectorFilter.matches(message); + messageOk = getMessageSelectorFilter().matches(message); } } catch (Exception e) @@ -274,7 +284,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _0_10session.messageAcknowledge (Range.newInstance((int) message.getDeliveryTag()), - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); final AMQException amqe = _0_10session.getCurrentException(); if (amqe != null) @@ -338,20 +348,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { messageFlow(); } - if (messageListener != null && !_synchronousQueue.isEmpty()) + if (messageListener != null && !getSynchronousQueue().isEmpty()) { - Iterator messages=_synchronousQueue.iterator(); + Iterator messages= getSynchronousQueue().iterator(); while (messages.hasNext()) { AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); messages.remove(); - _session.rejectMessage(message, true); + getSession().rejectMessage(message, true); } } } catch(TransportException e) { - throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e); + throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e); } } @@ -378,7 +388,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _syncReceive.set(true); } - if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty()) + if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty()) { messageFlow(); } @@ -415,19 +425,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { super.postDeliver(msg); - switch (_acknowledgeMode) + switch (getAcknowledgeMode()) { case Session.SESSION_TRANSACTED: _0_10session.sendTxCompletionsIfNecessary(); break; case Session.NO_ACKNOWLEDGE: - if (!_session.isInRecovery()) + if (!getSession().isInRecovery()) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + getSession().acknowledgeMessage(msg.getDeliveryTag(), false); } break; case Session.AUTO_ACKNOWLEDGE: - if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck()) + if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck()) { ((AMQSession_0_10) getSession()).getQpidSession().sync(); } @@ -443,10 +453,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM @Override public void rollbackPendingMessages() { - if (_synchronousQueue.size() > 0) + if (getSynchronousQueue().size() > 0) { RangeSet ranges = RangeSetFactory.createRangeSet(); - Iterator iterator = _synchronousQueue.iterator(); + Iterator iterator = getSynchronousQueue().iterator(); while (iterator.hasNext()) { @@ -486,7 +496,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } else { - return _exclusive; + return super.isExclusive(); } } @@ -514,7 +524,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _preAcquire; } - private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination) + private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport) { boolean preAcquire; if (browseOnly) @@ -524,7 +534,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM else { boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE); - if (isQueue && getMessageSelectorFilter() != null) + if (!serverJmsSelectorSupport && isQueue && getMessageSelectorFilter() != null) { preAcquire = false; } |