summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-09-21 03:14:50 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-09-21 03:14:50 +0000
commitd9dd3f022e7479224f730f07ba4c1ba3041d06dc (patch)
treea7ce7d19b9dc2cfc35eb706ce6c6b225a37d8e90
parent9c0c5904383d21f1ef4d76bddb56d3e2033bc13e (diff)
downloadqpid-python-d9dd3f022e7479224f730f07ba4c1ba3041d06dc.tar.gz
QPID-2786
Added logic to the AMQTopic class to handle the creation of the durable subscription topics using an addressing string. Modified the subscription queue creation to handle durable subscriptions. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@999196 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java7
4 files changed, 52 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 3a2910732f..eb9682a3cf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -125,7 +125,7 @@ public abstract class AMQDestination implements Destination, Referenceable
protected final static DestSyntax defaultDestSyntax;
- protected DestSyntax _destSyntax;
+ protected DestSyntax _destSyntax = DestSyntax.ADDR;
protected AddressHelper _addrHelper;
protected Address _address;
@@ -928,4 +928,14 @@ public abstract class AMQDestination implements Destination, Referenceable
dest.setAddressResolved(_isAddressResolved);
return dest;
}
+
+ protected void setAutoDelete(boolean b)
+ {
+ _isAutoDelete = b;
+ }
+
+ protected void setDurable(boolean b)
+ {
+ _isDurable = b;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index c1021e121c..a2606bfaa8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -531,6 +531,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
rk = routingKey.toString();
}
+
return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
}
@@ -568,7 +569,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
boolean nowait, String messageSelector, int tag)
throws AMQException, FailoverException
- {
+ {
boolean preAcquire;
long capacity = getCapacity(consumer.getDestination());
@@ -1175,7 +1176,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
-
dest.setAddressResolved(true);
}
}
@@ -1231,17 +1231,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private void createSubscriptionQueue(AMQDestination dest) throws AMQException
{
QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
-
+
if (dest.getQueueName() == null)
{
if (dest.getLink() != null && dest.getLink().getName() != null)
{
dest.setQueueName(new AMQShortString(dest.getLink().getName()));
}
- node.setExclusive(true);
- node.setAutoDelete(true);
- send0_10QueueDeclare(dest,null,false,false);
}
+ node.setExclusive(true);
+ node.setAutoDelete(!node.isDurable());
+ send0_10QueueDeclare(dest,null,false,true);
node.addBinding(new Binding(dest.getAddressName(),
dest.getQueueName(),// should have one by now
dest.getSubject(),
@@ -1260,7 +1260,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void setLegacyFiledsForTopicType(AMQDestination dest)
{
// legacy support
- dest.setQueueName(null);
dest.setExchangeName(new AMQShortString(dest.getAddressName()));
ExchangeNode node = (ExchangeNode)dest.getTargetNode();
dest.setExchangeClass(node.getExchangeType() == null?
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 2704393cc2..3f80d01811 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -27,6 +27,7 @@ import javax.jms.Topic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.BindingURL;
public class AMQTopic extends AMQDestination implements Topic
@@ -35,6 +36,11 @@ public class AMQTopic extends AMQDestination implements Topic
{
super(address);
}
+
+ public AMQTopic(Address address) throws Exception
+ {
+ super(address);
+ }
/**
* Constructor for use in creating a topic using a BindingURL.
@@ -92,9 +98,32 @@ public class AMQTopic extends AMQDestination implements Topic
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
- return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
+ if (topic.getDestSyntax() == DestSyntax.ADDR)
+ {
+ try
+ {
+ AMQTopic t = new AMQTopic(topic.getAddress());
+ t.setQueueName(getDurableTopicQueueName(subscriptionName, connection));
+ t.getSourceNode().setAutoDelete(false);
+ t.getSourceNode().setDurable(true);
+ t.setAutoDelete(false);
+ t.setDurable(true);
+ return t;
+ }
+ catch(Exception e)
+ {
+ JMSException ex = new JMSException("Error creating durable topic");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
+ else
+ {
+ return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
getDurableTopicQueueName(subscriptionName, connection),
true);
+ }
}
public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 66766369d7..a0d29093eb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -115,11 +115,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
capacity = _0_10session.getAMQConnection().getMaxPrefetch();
}
-
+
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
- if (destination.getLink() == null || destination.getLink().getName() == null)
+ boolean durable = destination.getSourceNode() != null && destination.getSourceNode().isDurable();
+ boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
+
+ if (!durable && !namedQueue)
{
_destination = destination.copyDestination();
_destination.setQueueName(null);