diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-18 11:34:30 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-18 11:34:30 +0000 |
commit | af1bfee570b41fa31d2c937bf03fff3dedca85bd (patch) | |
tree | d0181b66d889c1bdba035db7af53c657fb3fa9b3 | |
parent | 43fe7becfe127c13e69ea16191719759dedcb1ba (diff) | |
download | qpid-python-af1bfee570b41fa31d2c937bf03fff3dedca85bd.tar.gz |
added 0_10 replyTo support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@576853 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 46 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 3bd5856df5..af4905710b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -29,6 +29,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import org.apache.qpidity.api.Message; import org.apache.qpidity.transport.Struct; +import org.apache.qpidity.transport.ExchangeQueryResult; +import org.apache.qpidity.transport.Future; import javax.jms.JMSException; import java.io.IOException; @@ -76,6 +78,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By getSession().getAMQConnection().exceptionReceived(e); } Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; + // if there is a replyto destination then we need to request the exchange info + if (message.getMessageProperties().getReplyTo() != null) + { + Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession() + .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); + ExchangeQueryResult res = future.get(); + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo() + .getExchangeName() + "/" + message.getMessageProperties().getReplyTo() + .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey(); + newMessage.setReplyToURL(replyToUrl); + } newMessage.setContentHeader(headers); getSession().messageReceived(newMessage); } @@ -111,10 +125,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } - public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( + UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception { - return _messageFactory.createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(), + messageFrame.getExchange(), messageFrame.getRoutingKey(), + messageFrame.getContentHeader(), messageFrame.getBodies(), messageFrame.getReplyToURL()); } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 1f70663df8..2e407eabf0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -100,7 +100,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader, AMQShortString exchange, AMQShortString routingKey, - List bodies) throws AMQException + List bodies, String replyToURL) throws AMQException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); @@ -135,7 +135,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory // todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders()); props.setMessageId(mprop.getMessageId()); props.setPriority((byte) devprop.getPriority()); - // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo())); + props.setReplyTo(replyToURL); props.setTimestamp(devprop.getTimestamp()); props.setType(mprop.getType()); props.setUserId(mprop.getUserId()); @@ -154,11 +154,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader, - AMQShortString exchange, AMQShortString routingKey, List bodies) + AMQShortString exchange, AMQShortString routingKey, List bodies, + String replyToURL) throws JMSException, AMQException { final AbstractJMSMessage msg = - create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); + create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL); msg.setJMSRedelivered(redelivered); return msg; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 6198f0504e..c6b22592e0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -41,7 +41,7 @@ public interface MessageFactory AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, Struct[] contentHeader, AMQShortString exchange, AMQShortString routingKey, - List bodies) + List bodies, String replyToURL) throws JMSException, AMQException; AbstractJMSMessage createMessage() throws JMSException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 13a2202e6f..24ab471f10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -67,7 +67,6 @@ public class MessageFactoryRegistry } - public void registerFactory(String mimeType, MessageFactory mf) { if (mf == null) @@ -122,11 +121,11 @@ public class MessageFactoryRegistry } public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, - AMQShortString routingKey, Struct[] contentHeader, List bodies) - throws AMQException, JMSException + AMQShortString routingKey, Struct[] contentHeader, List bodies, + String replyTo) throws AMQException, JMSException { MessageProperties mprop = (MessageProperties) contentHeader[0]; - String messageType = mprop.getContentType(); + String messageType = mprop.getContentType(); if (messageType == null) { _logger.debug("no message type specified, building a byte message"); @@ -139,7 +138,7 @@ public class MessageFactoryRegistry } else { - return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies); + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 9aab5d094b..5cb943cd33 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -88,4 +88,10 @@ public abstract class UnprocessedMessage<H,B> public abstract List<B> getBodies(); public abstract H getContentHeader(); + + // specific to 0_10 + public String getReplyToURL() + { + return ""; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java index 970ba5a66a..79d829ca39 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java @@ -38,6 +38,7 @@ import org.apache.qpidity.transport.Struct; public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer> { private Struct[] _headers; + private String _replyToURL; /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>(); @@ -78,4 +79,14 @@ public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuf return _bodies; } + // additional 0_10 method + public String getReplyToURL() + { + return _replyToURL; + } + + public void setReplyToURL(String url) + { + _replyToURL = url; + } } |