diff options
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java')
-rw-r--r-- | java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java | 63 |
1 files changed, 16 insertions, 47 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 546cc79f9e..f7e2d2df50 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -30,6 +30,9 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -207,15 +210,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - queue = _vhost.createQueue( - UUIDGenerator.generateQueueUUID(name, _vhost.getName()), - name, - isDurable, - null, - true, - true, - true, - Collections.EMPTY_MAP); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName())); + attributes.put(Queue.NAME, name); + attributes.put(Queue.DURABLE, isDurable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK); + + queue = _vhost.createQueue(getSession(), attributes); } else { @@ -308,44 +310,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); - if(!isDurable) - { - final String queueName = name; - final AMQQueue tempQueue = queue; - - final Action<Connection_1_0> deleteQueueTask = - new Action<Connection_1_0>() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue", e); - } - } - } - }; - - getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); - } - - - }); - } - qd = new QueueDestination(queue); } catch (QpidSecurityException e) @@ -409,6 +373,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer"); throw new ConnectionScopedRuntimeException(e); } + catch (MessageSource.ConsumerAccessRefused e) + { + _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy"); + throw new ConnectionScopedRuntimeException(e); + } } } |