diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 129 |
1 files changed, 37 insertions, 92 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 388adfb434..2a37298a43 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -22,11 +22,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.api.Message; import org.apache.qpid.transport.*; import org.apache.qpid.QpidException; import org.apache.qpid.filter.MessageFilter; @@ -35,16 +32,14 @@ import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.MessageListener; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; /** * This is a 0.10 message consumer. */ -public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer> - implements org.apache.qpid.nclient.util.MessageListener +public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10> + implements org.apache.qpid.nclient.MessagePartListener { /** @@ -76,6 +71,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By * Specify whether this consumer is performing a sync receive */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); + private String _consumerTagString; //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, @@ -106,6 +102,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _isStarted = connection.started(); } + + @Override public void setConsumerTag(int consumerTag) + { + super.setConsumerTag(consumerTag); + _consumerTagString = String.valueOf(consumerTag); + } + + public String getConsumerTagString() + { + return _consumerTagString; + } + + // ----- Interface org.apache.qpid.client.util.MessageListener /** @@ -142,7 +151,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { if (isMessageListenerSet() && ! getSession().prefetch()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } _logger.debug("messageOk, trying to notify"); @@ -155,52 +164,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By /** * This method is invoked by the transport layer when a message is delivered for this * consumer. The message is transformed and pass to the session. - * @param message an 0.10 message + * @param xfr an 0.10 message transfer */ - public void onMessage(Message message) + public void messageTransfer(MessageTransfer xfr) + + //public void onMessage(Message message) { int channelId = getSession().getChannelId(); - long deliveryId = message.getMessageTransferId(); - AMQShortString consumerTag = getConsumerTag(); - AMQShortString exchange; - AMQShortString routingKey; - boolean redelivered = false; - Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; - if (headers[0] == null) { - headers[0] = new MessageProperties(); - } - if( message.getDeliveryProperties() != null ) - { - exchange = new AMQShortString(message.getDeliveryProperties().getExchange()); - routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey()); - redelivered = message.getDeliveryProperties().getRedelivered(); - } - else - { - exchange = new AMQShortString(""); - routingKey = new AMQShortString(""); - headers[1] = new DeliveryProperties(); - } + int consumerTag = getConsumerTag(); + UnprocessedMessage_0_10 newMessage = - new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered); - try - { - newMessage.receiveBody(message.readData()); - } - catch (IOException e) - { - getSession().getAMQConnection().exceptionReceived(e); - } - // if there is a replyto destination then we need to request the exchange info - ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo(); - if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals("")) - { - // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - // the exchnage class will be set later from within the sesion thread - String replyToUrl = replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey(); - newMessage.setReplyToURL(replyToUrl); - } - newMessage.setContentHeader(headers); + new UnprocessedMessage_0_10(consumerTag, xfr); + + getSession().messageReceived(newMessage); // else ignore this message } @@ -213,47 +189,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ @Override void sendCancel() throws AMQException { - ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString()); + ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString()); ((AMQSession_0_10) getSession()).getQpidSession().sync(); // confirm cancel getSession().confirmConsumerCancelled(getConsumerTag()); ((AMQSession_0_10) getSession()).getCurrentException(); } - @Override void notifyMessage(UnprocessedMessage messageFrame) + @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame) { - // if there is a replyto destination then we need to request the exchange info - String replyToURL = messageFrame.getReplyToURL(); - if (replyToURL != null && !replyToURL.equals("")) - { - AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/'))); - String replyToUrl = "://" + replyToURL; - if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl; - } - else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl; - } - else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl; - } - else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl; - } - else - { - Future<ExchangeQueryResult> future = - ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString()); - ExchangeQueryResult res = future.get(); - // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - replyToUrl = res.getType() + replyToUrl; - } - ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl); - } + super.notifyMessage(messageFrame); } @@ -267,11 +212,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( - AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception + AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception { - return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(), - messageFrame.getContentHeader(), messageFrame.getBodies() - ); + AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession()); + return _messageFactory.createMessage(msg.getMessageTransfer()); } // private methods @@ -327,7 +271,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // and messages are not prefetched we then need to request another one if(! getSession().prefetch()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } } @@ -415,7 +359,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.setMessageListener(messageListener); if (messageListener != null && ! getSession().prefetch()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } if (messageListener != null && !_synchronousQueue.isEmpty()) @@ -440,7 +384,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _isStarted = true; if (_syncReceive.get()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } } @@ -463,7 +407,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } if (! getSession().prefetch()) @@ -486,4 +430,5 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _session.acknowledgeMessage(msg.getDeliveryTag(), false); } } + } |