summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java408
1 files changed, 290 insertions, 118 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 8a7c6b1a01..8490a724bf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -17,6 +17,11 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,8 +34,10 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+
import javax.jms.Destination;
import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -44,18 +51,32 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
+import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
-import static org.apache.qpid.transport.Option.BATCH;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
@@ -347,15 +368,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
+ // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
List<Binding> bindings = new ArrayList<Binding>();
- bindings.addAll(destination.getSourceNode().getBindings());
- bindings.addAll(destination.getTargetNode().getBindings());
+ bindings.addAll(destination.getNode().getBindings());
String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
destination.getAddressName(): "amq.topic";
for (Binding binding: bindings)
{
+ // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+ // The null check below is a way to side step that issue while fixing QPID-4146
+ // Note this issue only affects producers.
+ if (binding.getQueue() == null && queueName == null)
+ {
+ continue;
+ }
String queue = binding.getQueue() == null?
queueName.asString(): binding.getQueue();
@@ -523,11 +551,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
final FieldTable rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
-
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
- getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh,
- prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose);
+ getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow,
+ exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
/**
@@ -558,7 +584,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
rk = routingKey.toString();
}
- return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+ return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
@@ -591,10 +617,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* This method is invoked when a consumer is created
* Registers the consumer with the broker
*/
- public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName,
boolean nowait, int tag)
throws AMQException, FailoverException
- {
+ {
+ if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
+ {
+ if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
+ {
+ String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
+
+ createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
+ queueName = consumer.getDestination().getAMQQueueName();
+ consumer.setQueuename(queueName);
+ }
+ handleLinkCreation(consumer.getDestination());
+ }
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
@@ -637,11 +675,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
capacity,
Option.UNRELIABLE);
}
-
- if (!nowait)
- {
- sync();
- }
+ sync();
}
/**
@@ -653,7 +687,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
- getProtocolHandler(), producerId, immediate, mandatory);
+ producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -673,26 +707,25 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* creates an exchange if it does not already exist
*/
- public void sendExchangeDeclare(final AMQShortString name,
- final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait)
- throws AMQException, FailoverException
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
- sendExchangeDeclare(name.asString(), type.asString(), null, null,
- nowait);
+ //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it
+ sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
}
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
- final boolean nowait) throws AMQException
+ final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
{
getQpidSession().exchangeDeclare(
name,
type,
alternateExchange,
args,
- name.toString().startsWith("amq.") ? Option.PASSIVE
- : Option.NONE);
+ name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+ durable ? Option.DURABLE : Option.NONE,
+ autoDelete ? Option.AUTO_DELETE : Option.NONE);
// We need to sync so that we get notify of an error.
if (!nowait)
{
@@ -717,18 +750,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Declare a queue with the given queueName
*/
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive)
- throws AMQException, FailoverException
- {
- // do nothing this is only used by 0_8
- }
-
- /**
- * Declare a queue with the given queueName
- */
- public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait, boolean passive)
+ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal,
+ final boolean nowait, boolean passive)
throws AMQException
{
AMQShortString queueName;
@@ -759,7 +782,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- QueueNode node = (QueueNode)amqd.getSourceNode();
+ // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java
+ Node node = amqd.getNode();
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs());
if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null)
@@ -925,12 +949,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return getCurrentException();
}
+ @Override
protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
-
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -947,7 +970,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
+ return send0_10QueueDeclare(amqd, noLocal, nowait, passive);
}
}, getAMQConnection()).execute();
}
@@ -1072,11 +1095,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
- public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
match = !result.getNotFound();
+ Node node = dest.getNode();
if (match)
{
@@ -1086,16 +1110,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(node.getExchangeType() != null &&
node.getExchangeType().equals(result.getType())) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
- }
- else if (node.getExchangeType() != null)
- {
- // even if assert is false, better to verify this
- match = node.getExchangeType().equals(result.getType());
- if (!match)
- {
- _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() +
- " actual " + result.getType());
- }
}
else
{
@@ -1104,18 +1118,27 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setExchangeClass(new AMQShortString(result.getType()));
}
}
-
+
+ if (assertNode)
+ {
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +", Result was : " + result);
+ }
+ }
+
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
+ public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
boolean match = true;
try
{
QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
match = dest.getAddressName().equals(result.getQueue());
-
+ Node node = dest.getNode();
+
if (match && assertNode)
{
match = (result.getDurable() == node.isDurable()) &&
@@ -1123,9 +1146,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(result.getExclusive() == node.isExclusive()) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
}
- else if (match)
+
+ if (assertNode)
{
- // should I use the queried details to update the local data structure.
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +", Result was : " + result);
+ }
}
}
catch(SessionException e)
@@ -1140,7 +1167,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
"Error querying queue",e);
}
}
-
return match;
}
@@ -1179,17 +1205,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
@SuppressWarnings("deprecation")
- public void handleAddressBasedDestination(AMQDestination dest,
+ public void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException
+ boolean noLocal) throws AMQException
{
if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
{
- if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
- {
- createSubscriptionQueue(dest,noLocal);
- }
+ return;
}
else
{
@@ -1209,46 +1231,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
case AMQDestination.QUEUE_TYPE:
{
- if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
+ if(createNode)
{
- setLegacyFiledsForQueueType(dest);
+ setLegacyFieldsForQueueType(dest);
+ handleQueueNodeCreation(dest,noLocal);
break;
}
- else if(createNode)
+ else if (isQueueExist(dest,assertNode))
{
- setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,noLocal,noWait, false);
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
+ setLegacyFieldsForQueueType(dest);
break;
- }
+ }
}
case AMQDestination.TOPIC_TYPE:
{
- if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
+ if(createNode)
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest, noLocal);
- }
+ handleExchangeNodeCreation(dest);
break;
}
- else if(createNode)
+ else if (isExchangeExist(dest,assertNode))
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
- sendExchangeDeclare(dest.getAddressName(),
- dest.getExchangeClass().asString(),
- dest.getTargetNode().getAlternateExchange(),
- dest.getTargetNode().getDeclareArgs(),
- false);
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest,noLocal);
- }
break;
}
}
@@ -1287,7 +1295,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
throw new AMQException("Ambiguous address, please specify queue or topic as node type");
}
dest.setAddressType(type);
- dest.rebuildTargetAndSourceNodes(type);
return type;
}
}
@@ -1309,30 +1316,45 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
}
-
- private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
+
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
{
- QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
-
- if (dest.getQueueName() == null)
+ Link link = dest.getLink();
+ String queueName = dest.getQueueName();
+
+ if (queueName == null)
{
- if (dest.getLink() != null && dest.getLink().getName() != null)
- {
- dest.setQueueName(new AMQShortString(dest.getLink().getName()));
- }
+ queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
+ dest.setQueueName(new AMQShortString(queueName));
+ }
+
+ SubscriptionQueue queueProps = link.getSubscriptionQueue();
+ Map<String,Object> arguments = queueProps.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+
+ if (link.isDurable() && queueName.startsWith("TempQueue"))
+ {
+ throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link.");
}
- node.setExclusive(true);
- node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,noLocal,true, false);
- getQpidSession().exchangeBind(dest.getQueueName(),
- dest.getAddressName(),
- dest.getSubject(),
- Collections.<String,Object>emptyMap());
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
+
+ getQpidSession().queueDeclare(queueName,
+ queueProps.getAlternateExchange(), arguments,
+ queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ link.isDurable() ? Option.DURABLE : Option.NONE,
+ queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+ Map<String,Object> bindingArguments = new HashMap<String, Object>();
+ bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+ getQpidSession().exchangeBind(queueName,
+ dest.getAddressName(),
+ dest.getSubject(),
+ bindingArguments);
}
-
- public void setLegacyFiledsForQueueType(AMQDestination dest)
+
+ public void setLegacyFieldsForQueueType(AMQDestination dest)
{
// legacy support
dest.setQueueName(new AMQShortString(dest.getAddressName()));
@@ -1345,7 +1367,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
// legacy support
dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- ExchangeNode node = (ExchangeNode)dest.getTargetNode();
+ Node node = dest.getNode();
dest.setExchangeClass(node.getExchangeType() == null?
ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
new AMQShortString(node.getExchangeType()));
@@ -1424,6 +1446,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return _qpidSession.isFlowBlocked();
}
+ @Override
+ public void setFlowControl(boolean active)
+ {
+ // Supported by 0-8..0-9-1 only
+ throw new UnsupportedOperationException("Operation not supported by this protocol");
+ }
+
private void cancelTimerTask()
{
if (flushTask != null)
@@ -1432,5 +1461,148 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
flushTask = null;
}
}
-}
+ private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+ {
+ Node node = dest.getNode();
+ Map<String,Object> arguments = node.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+ getQpidSession().queueDeclare(dest.getAddressName(),
+ node.getAlternateExchange(), arguments,
+ node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ node.isDurable() ? Option.DURABLE : Option.NONE,
+ node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
+ {
+ Node node = dest.getNode();
+ sendExchangeDeclare(dest.getAddressName(),
+ node.getExchangeType(),
+ node.getAlternateExchange(),
+ node.getDeclareArgs(),
+ false,
+ node.isDurable(),
+ node.isAutoDelete());
+
+ // If bindings are specified without a queue name and is called by the producer,
+ // the broker will send an exception as expected.
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleLinkCreation(AMQDestination dest) throws AMQException
+ {
+ createBindings(dest, dest.getLink().getBindings());
+ }
+
+ void createBindings(AMQDestination dest, List<Binding> bindings)
+ {
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (Binding binding: bindings)
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ getQpidSession().exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
+ }
+
+ void handleLinkDelete(AMQDestination dest) throws AMQException
+ {
+ // We need to destroy link bindings
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (Binding binding: dest.getLink().getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unbinding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ getQpidSession().exchangeUnbind(queue, exchange,
+ binding.getBindingKey());
+ }
+ }
+
+ void deleteSubscriptionQueue(AMQDestination dest) throws AMQException
+ {
+ // We need to delete the subscription queue.
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getSubscriptionQueue().isExclusive() &&
+ isQueueExist(dest, false))
+ {
+ getQpidSession().queueDelete(dest.getQueueName());
+ }
+ }
+
+ void handleNodeDelete(AMQDestination dest) throws AMQException
+ {
+ if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+ {
+ if (isExchangeExist(dest,false))
+ {
+ getQpidSession().exchangeDelete(dest.getAddressName());
+ }
+ }
+ else
+ {
+ if (isQueueExist(dest,false))
+ {
+ getQpidSession().queueDelete(dest.getAddressName());
+ }
+ }
+ }
+}