summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-09-18 11:34:30 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-09-18 11:34:30 +0000
commitaf1bfee570b41fa31d2c937bf03fff3dedca85bd (patch)
treed0181b66d889c1bdba035db7af53c657fb3fa9b3
parent43fe7becfe127c13e69ea16191719759dedcb1ba (diff)
downloadqpid-python-af1bfee570b41fa31d2c937bf03fff3dedca85bd.tar.gz
added 0_10 replyTo support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@576853 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java11
6 files changed, 46 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 3bd5856df5..af4905710b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -29,6 +29,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.ExchangeQueryResult;
+import org.apache.qpidity.transport.Future;
import javax.jms.JMSException;
import java.io.IOException;
@@ -76,6 +78,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
getSession().getAMQConnection().exceptionReceived(e);
}
Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+ // if there is a replyto destination then we need to request the exchange info
+ if (message.getMessageProperties().getReplyTo() != null)
+ {
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+ .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+ .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+ .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+ newMessage.setReplyToURL(replyToUrl);
+ }
newMessage.setContentHeader(headers);
getSession().messageReceived(newMessage);
}
@@ -111,10 +125,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
+ UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
+ messageFrame.getExchange(), messageFrame.getRoutingKey(),
+ messageFrame.getContentHeader(), messageFrame.getBodies(), messageFrame.getReplyToURL());
}
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 1f70663df8..2e407eabf0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -100,7 +100,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
AMQShortString exchange, AMQShortString routingKey,
- List bodies) throws AMQException
+ List bodies, String replyToURL) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -135,7 +135,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
// todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders());
props.setMessageId(mprop.getMessageId());
props.setPriority((byte) devprop.getPriority());
- // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo()));
+ props.setReplyTo(replyToURL);
props.setTimestamp(devprop.getTimestamp());
props.setType(mprop.getType());
props.setUserId(mprop.getUserId());
@@ -154,11 +154,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies)
+ AMQShortString exchange, AMQShortString routingKey, List bodies,
+ String replyToURL)
throws JMSException, AMQException
{
final AbstractJMSMessage msg =
- create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
msg.setJMSRedelivered(redelivered);
return msg;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 6198f0504e..c6b22592e0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -41,7 +41,7 @@ public interface MessageFactory
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
Struct[] contentHeader,
AMQShortString exchange, AMQShortString routingKey,
- List bodies)
+ List bodies, String replyToURL)
throws JMSException, AMQException;
AbstractJMSMessage createMessage() throws JMSException;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 13a2202e6f..24ab471f10 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -67,7 +67,6 @@ public class MessageFactoryRegistry
}
-
public void registerFactory(String mimeType, MessageFactory mf)
{
if (mf == null)
@@ -122,11 +121,11 @@ public class MessageFactoryRegistry
}
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, Struct[] contentHeader, List bodies)
- throws AMQException, JMSException
+ AMQShortString routingKey, Struct[] contentHeader, List bodies,
+ String replyTo) throws AMQException, JMSException
{
MessageProperties mprop = (MessageProperties) contentHeader[0];
- String messageType = mprop.getContentType();
+ String messageType = mprop.getContentType();
if (messageType == null)
{
_logger.debug("no message type specified, building a byte message");
@@ -139,7 +138,7 @@ public class MessageFactoryRegistry
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 9aab5d094b..5cb943cd33 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -88,4 +88,10 @@ public abstract class UnprocessedMessage<H,B>
public abstract List<B> getBodies();
public abstract H getContentHeader();
+
+ // specific to 0_10
+ public String getReplyToURL()
+ {
+ return "";
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
index 970ba5a66a..79d829ca39 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
@@ -38,6 +38,7 @@ import org.apache.qpidity.transport.Struct;
public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
{
private Struct[] _headers;
+ private String _replyToURL;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
@@ -78,4 +79,14 @@ public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuf
return _bodies;
}
+ // additional 0_10 method
+ public String getReplyToURL()
+ {
+ return _replyToURL;
+ }
+
+ public void setReplyToURL(String url)
+ {
+ _replyToURL = url;
+ }
}