diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-09-21 03:14:50 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-09-21 03:14:50 +0000 |
commit | d9dd3f022e7479224f730f07ba4c1ba3041d06dc (patch) | |
tree | a7ce7d19b9dc2cfc35eb706ce6c6b225a37d8e90 | |
parent | 9c0c5904383d21f1ef4d76bddb56d3e2033bc13e (diff) | |
download | qpid-python-d9dd3f022e7479224f730f07ba4c1ba3041d06dc.tar.gz |
QPID-2786
Added logic to the AMQTopic class to handle the creation of the durable subscription topics using an addressing string.
Modified the subscription queue creation to handle durable subscriptions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@999196 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 52 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 3a2910732f..eb9682a3cf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -125,7 +125,7 @@ public abstract class AMQDestination implements Destination, Referenceable protected final static DestSyntax defaultDestSyntax; - protected DestSyntax _destSyntax; + protected DestSyntax _destSyntax = DestSyntax.ADDR; protected AddressHelper _addrHelper; protected Address _address; @@ -928,4 +928,14 @@ public abstract class AMQDestination implements Destination, Referenceable dest.setAddressResolved(_isAddressResolved); return dest; } + + protected void setAutoDelete(boolean b) + { + _isAutoDelete = b; + } + + protected void setDurable(boolean b) + { + _isDurable = b; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index c1021e121c..a2606bfaa8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -531,6 +531,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { rk = routingKey.toString(); } + return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null); } @@ -568,7 +569,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException - { + { boolean preAcquire; long capacity = getCapacity(consumer.getDestination()); @@ -1175,7 +1176,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic "The name '" + dest.getAddressName() + "' supplied in the address doesn't resolve to an exchange or a queue"); } - dest.setAddressResolved(true); } } @@ -1231,17 +1231,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private void createSubscriptionQueue(AMQDestination dest) throws AMQException { QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - + if (dest.getQueueName() == null) { if (dest.getLink() != null && dest.getLink().getName() != null) { dest.setQueueName(new AMQShortString(dest.getLink().getName())); } - node.setExclusive(true); - node.setAutoDelete(true); - send0_10QueueDeclare(dest,null,false,false); } + node.setExclusive(true); + node.setAutoDelete(!node.isDurable()); + send0_10QueueDeclare(dest,null,false,true); node.addBinding(new Binding(dest.getAddressName(), dest.getQueueName(),// should have one by now dest.getSubject(), @@ -1260,7 +1260,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void setLegacyFiledsForTopicType(AMQDestination dest) { // legacy support - dest.setQueueName(null); dest.setExchangeName(new AMQShortString(dest.getAddressName())); ExchangeNode node = (ExchangeNode)dest.getTargetNode(); dest.setExchangeClass(node.getExchangeType() == null? diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 2704393cc2..3f80d01811 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -27,6 +27,7 @@ import javax.jms.Topic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; import org.apache.qpid.url.BindingURL; public class AMQTopic extends AMQDestination implements Topic @@ -35,6 +36,11 @@ public class AMQTopic extends AMQDestination implements Topic { super(address); } + + public AMQTopic(Address address) throws Exception + { + super(address); + } /** * Constructor for use in creating a topic using a BindingURL. @@ -92,9 +98,32 @@ public class AMQTopic extends AMQDestination implements Topic public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { - return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false, + if (topic.getDestSyntax() == DestSyntax.ADDR) + { + try + { + AMQTopic t = new AMQTopic(topic.getAddress()); + t.setQueueName(getDurableTopicQueueName(subscriptionName, connection)); + t.getSourceNode().setAutoDelete(false); + t.getSourceNode().setDurable(true); + t.setAutoDelete(false); + t.setDurable(true); + return t; + } + catch(Exception e) + { + JMSException ex = new JMSException("Error creating durable topic"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + else + { + return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false, getDurableTopicQueueName(subscriptionName, connection), true); + } } public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 66766369d7..a0d29093eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -115,11 +115,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { capacity = _0_10session.getAMQConnection().getMaxPrefetch(); } - + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { - if (destination.getLink() == null || destination.getLink().getName() == null) + boolean durable = destination.getSourceNode() != null && destination.getSourceNode().isDurable(); + boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; + + if (!durable && !namedQueue) { _destination = destination.copyDestination(); _destination.setQueueName(null); |