diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java | 35 |
1 files changed, 21 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index b2f4fcef84..b00f9dd98a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,24 +20,31 @@ */ package org.apache.qpid.client; -import javax.jms.JMSException; -import javax.jms.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.url.BindingURL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Message; public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8> { - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache; private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache; @@ -88,11 +95,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe void sendCancel() throws AMQException, FailoverException { - BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false); + BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false); - final AMQFrame cancelFrame = body.generateFrame(_channelId); + final AMQFrame cancelFrame = body.generateFrame(getChannelId()); - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); if (_logger.isDebugEnabled()) { @@ -103,9 +110,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception { - return _messageFactory.createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(), + return getMessageFactory().createMessage(messageFrame.getDeliveryTag(), + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(), _queueDestinationCache, _topicDestinationCache); } |