summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java129
1 files changed, 37 insertions, 92 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 388adfb434..2a37298a43 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -22,11 +22,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.api.Message;
import org.apache.qpid.transport.*;
import org.apache.qpid.QpidException;
import org.apache.qpid.filter.MessageFilter;
@@ -35,16 +32,14 @@ import org.apache.qpid.filter.JMSSelectorFilter;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a 0.10 message consumer.
*/
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
- implements org.apache.qpid.nclient.util.MessageListener
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
+ implements org.apache.qpid.nclient.MessagePartListener
{
/**
@@ -76,6 +71,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+ private String _consumerTagString;
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
@@ -106,6 +102,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_isStarted = connection.started();
}
+
+ @Override public void setConsumerTag(int consumerTag)
+ {
+ super.setConsumerTag(consumerTag);
+ _consumerTagString = String.valueOf(consumerTag);
+ }
+
+ public String getConsumerTagString()
+ {
+ return _consumerTagString;
+ }
+
+
// ----- Interface org.apache.qpid.client.util.MessageListener
/**
@@ -142,7 +151,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
if (isMessageListenerSet() && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
_logger.debug("messageOk, trying to notify");
@@ -155,52 +164,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
/**
* This method is invoked by the transport layer when a message is delivered for this
* consumer. The message is transformed and pass to the session.
- * @param message an 0.10 message
+ * @param xfr an 0.10 message transfer
*/
- public void onMessage(Message message)
+ public void messageTransfer(MessageTransfer xfr)
+
+ //public void onMessage(Message message)
{
int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- AMQShortString consumerTag = getConsumerTag();
- AMQShortString exchange;
- AMQShortString routingKey;
- boolean redelivered = false;
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- if (headers[0] == null) {
- headers[0] = new MessageProperties();
- }
- if( message.getDeliveryProperties() != null )
- {
- exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- redelivered = message.getDeliveryProperties().getRedelivered();
- }
- else
- {
- exchange = new AMQShortString("");
- routingKey = new AMQShortString("");
- headers[1] = new DeliveryProperties();
- }
+ int consumerTag = getConsumerTag();
+
UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- try
- {
- newMessage.receiveBody(message.readData());
- }
- catch (IOException e)
- {
- getSession().getAMQConnection().exceptionReceived(e);
- }
- // if there is a replyto destination then we need to request the exchange info
- ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo();
- if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals(""))
- {
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- // the exchnage class will be set later from within the sesion thread
- String replyToUrl = replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
- }
- newMessage.setContentHeader(headers);
+ new UnprocessedMessage_0_10(consumerTag, xfr);
+
+
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -213,47 +189,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
+ ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
((AMQSession_0_10) getSession()).getQpidSession().sync();
// confirm cancel
getSession().confirmConsumerCancelled(getConsumerTag());
((AMQSession_0_10) getSession()).getCurrentException();
}
- @Override void notifyMessage(UnprocessedMessage messageFrame)
+ @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
- // if there is a replyto destination then we need to request the exchange info
- String replyToURL = messageFrame.getReplyToURL();
- if (replyToURL != null && !replyToURL.equals(""))
- {
- AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/')));
- String replyToUrl = "://" + replyToURL;
- if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
- }
- else
- {
- Future<ExchangeQueryResult> future =
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- replyToUrl = res.getType() + replyToUrl;
- }
- ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
- }
+
super.notifyMessage(messageFrame);
}
@@ -267,11 +212,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
@Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
- AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
- messageFrame.getContentHeader(), messageFrame.getBodies()
- );
+ AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
+ return _messageFactory.createMessage(msg.getMessageTransfer());
}
// private methods
@@ -327,7 +271,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// and messages are not prefetched we then need to request another one
if(! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -415,7 +359,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super.setMessageListener(messageListener);
if (messageListener != null && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
@@ -440,7 +384,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_isStarted = true;
if (_syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -463,7 +407,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (! getSession().prefetch())
@@ -486,4 +430,5 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
}
+
}