summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-03-01 15:50:27 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-03-01 15:50:27 +0000
commit92be7e8f3163c048a8642d2deeaa921bbb65dc9c (patch)
tree5c0fa12fa0787d8870eab113469f74ccfb07a5be /qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
parente78f6a9e59098ac104892a79b74c9895272b292e (diff)
downloadqpid-python-92be7e8f3163c048a8642d2deeaa921bbb65dc9c.tar.gz
NO-JIRA: [AMQP 1-0 Sandbox] merging from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1295635 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java72
1 files changed, 57 insertions, 15 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 1e36119ce8..bde756dd03 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -51,23 +51,15 @@ import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.*;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.url.AMQBindingURL;
+import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
@@ -85,7 +77,6 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
-
private final long _subscriptionID;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
@@ -450,7 +441,53 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
}
- // TODO - ReplyTo
+ if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+ {
+ String origReplyToString = properties.getReplyTo().asString();
+ ReplyTo replyTo = new ReplyTo();
+ // if the string looks like a binding URL, then attempt to parse it...
+ try
+ {
+ AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+ AMQShortString routingKey = burl.getRoutingKey();
+ if(routingKey != null)
+ {
+ replyTo.setRoutingKey(routingKey.asString());
+ }
+
+ AMQShortString exchangeName = burl.getExchangeName();
+ if(exchangeName != null)
+ {
+ replyTo.setExchange(exchangeName.asString());
+ }
+ }
+ catch (URISyntaxException e)
+ {
+ replyTo.setRoutingKey(origReplyToString);
+ }
+ messageProps.setReplyTo(replyTo);
+
+ }
+
+ if(properties.getMessageId() != null)
+ {
+ try
+ {
+ String messageIdAsString = properties.getMessageIdAsString();
+ if(messageIdAsString.startsWith("ID:"))
+ {
+ messageIdAsString = messageIdAsString.substring(3);
+ }
+ UUID uuid = UUID.fromString(messageIdAsString);
+ messageProps.setMessageId(uuid);
+ }
+ catch(IllegalArgumentException e)
+ {
+ // ignore - can't parse
+ }
+ }
+
+
if(properties.getUserId() != null)
{
@@ -459,7 +496,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
FieldTable fieldTable = properties.getHeaders();
- final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+ Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+ if(properties.getType() != null)
+ {
+ appHeaders.put("x-jms-type", properties.getTypeAsString());
+ }
messageProps.setApplicationHeaders(appHeaders);