summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java74
1 files changed, 42 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 3b6179dd07..26bb51b821 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,21 +19,32 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Acquired;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,7 +57,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
/**
* This class logger
*/
- protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
/**
* The underlying QpidSession
@@ -67,7 +78,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private final long _capacity;
/** Flag indicating if the server supports message selectors */
- protected final boolean _serverJmsSelectorSupport;
+ private final boolean _serverJmsSelectorSupport;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -80,11 +91,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
- _preAcquire = evaluatePreAcquire(browseOnly, destination);
-
- _capacity = evaluateCapacity(destination);
_serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ _preAcquire = evaluatePreAcquire(browseOnly, destination, _serverJmsSelectorSupport);
+ _capacity = evaluateCapacity(destination);
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
@@ -92,8 +102,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (!namedQueue)
{
- _destination = destination.copyDestination();
- _destination.setQueueName(null);
+ setDestination(destination.copyDestination());
+ getDestination().setQueueName(null);
}
}
}
@@ -181,14 +191,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
super.preDeliver(jmsMsg);
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
//For 0-10 we need to ensure that all messages are indicated processed in some way to
//ensure their AMQP command-id is marked completed, and so we must send a completion
//even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
//Add message to the unacked message list to ensure we dont lose record of it before
//sending a completion of some sort.
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
+ getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -196,7 +206,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
- return _messageFactory.createMessage(msg.getMessageTransfer());
+ return getMessageFactory().createMessage(msg.getMessageTransfer());
}
/**
@@ -211,9 +221,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
boolean messageOk = true;
try
{
- if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
+ if (!_serverJmsSelectorSupport && getMessageSelectorFilter() != null)
{
- messageOk = _messageSelectorFilter.matches(message);
+ messageOk = getMessageSelectorFilter().matches(message);
}
}
catch (Exception e)
@@ -274,7 +284,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_0_10session.messageAcknowledge
(Range.newInstance((int) message.getDeliveryTag()),
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
final AMQException amqe = _0_10session.getCurrentException();
if (amqe != null)
@@ -338,20 +348,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
messageFlow();
}
- if (messageListener != null && !_synchronousQueue.isEmpty())
+ if (messageListener != null && !getSynchronousQueue().isEmpty())
{
- Iterator messages=_synchronousQueue.iterator();
+ Iterator messages= getSynchronousQueue().iterator();
while (messages.hasNext())
{
AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
messages.remove();
- _session.rejectMessage(message, true);
+ getSession().rejectMessage(message, true);
}
}
}
catch(TransportException e)
{
- throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
+ throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e);
}
}
@@ -378,7 +388,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
{
messageFlow();
}
@@ -415,19 +425,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
super.postDeliver(msg);
- switch (_acknowledgeMode)
+ switch (getAcknowledgeMode())
{
case Session.SESSION_TRANSACTED:
_0_10session.sendTxCompletionsIfNecessary();
break;
case Session.NO_ACKNOWLEDGE:
- if (!_session.isInRecovery())
+ if (!getSession().isInRecovery())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ getSession().acknowledgeMessage(msg.getDeliveryTag(), false);
}
break;
case Session.AUTO_ACKNOWLEDGE:
- if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck())
{
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
@@ -443,10 +453,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
@Override public void rollbackPendingMessages()
{
- if (_synchronousQueue.size() > 0)
+ if (getSynchronousQueue().size() > 0)
{
RangeSet ranges = RangeSetFactory.createRangeSet();
- Iterator iterator = _synchronousQueue.iterator();
+ Iterator iterator = getSynchronousQueue().iterator();
while (iterator.hasNext())
{
@@ -486,7 +496,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
else
{
- return _exclusive;
+ return super.isExclusive();
}
}
@@ -514,7 +524,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _preAcquire;
}
- private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+ private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport)
{
boolean preAcquire;
if (browseOnly)
@@ -524,7 +534,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
else
{
boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
- if (isQueue && getMessageSelectorFilter() != null)
+ if (!serverJmsSelectorSupport && isQueue && getMessageSelectorFilter() != null)
{
preAcquire = false;
}