diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-03-01 15:50:27 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-03-01 15:50:27 +0000 |
commit | 92be7e8f3163c048a8642d2deeaa921bbb65dc9c (patch) | |
tree | 5c0fa12fa0787d8870eab113469f74ccfb07a5be /qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java | |
parent | e78f6a9e59098ac104892a79b74c9895272b292e (diff) | |
download | qpid-python-92be7e8f3163c048a8642d2deeaa921bbb65dc9c.tar.gz |
NO-JIRA: [AMQP 1-0 Sandbox] merging from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1295635 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java | 72 |
1 files changed, 57 insertions, 15 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 1e36119ce8..bde756dd03 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -51,23 +51,15 @@ import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.url.AMQBindingURL; +import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -85,7 +77,6 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { - private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -450,7 +441,53 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setCorrelationId(properties.getCorrelationId().getBytes()); } - // TODO - ReplyTo + if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0) + { + String origReplyToString = properties.getReplyTo().asString(); + ReplyTo replyTo = new ReplyTo(); + // if the string looks like a binding URL, then attempt to parse it... + try + { + AMQBindingURL burl = new AMQBindingURL(origReplyToString); + AMQShortString routingKey = burl.getRoutingKey(); + if(routingKey != null) + { + replyTo.setRoutingKey(routingKey.asString()); + } + + AMQShortString exchangeName = burl.getExchangeName(); + if(exchangeName != null) + { + replyTo.setExchange(exchangeName.asString()); + } + } + catch (URISyntaxException e) + { + replyTo.setRoutingKey(origReplyToString); + } + messageProps.setReplyTo(replyTo); + + } + + if(properties.getMessageId() != null) + { + try + { + String messageIdAsString = properties.getMessageIdAsString(); + if(messageIdAsString.startsWith("ID:")) + { + messageIdAsString = messageIdAsString.substring(3); + } + UUID uuid = UUID.fromString(messageIdAsString); + messageProps.setMessageId(uuid); + } + catch(IllegalArgumentException e) + { + // ignore - can't parse + } + } + + if(properties.getUserId() != null) { @@ -459,7 +496,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr FieldTable fieldTable = properties.getHeaders(); - final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + + if(properties.getType() != null) + { + appHeaders.put("x-jms-type", properties.getTypeAsString()); + } messageProps.setApplicationHeaders(appHeaders); |