diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-17 20:19:36 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-17 20:19:36 +0000 |
commit | add5c695d1138bc25bb89cd0e1b1724bf542f676 (patch) | |
tree | f222dd1710831dc383ada64bc4479bcfd4c8bbc4 /java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java | |
parent | e31aa33452bd54ce118078ad38d8291cc3a3d1db (diff) | |
download | qpid-python-add5c695d1138bc25bb89cd0e1b1724bf542f676.tar.gz |
Update Queue implementation to better define lifetime and exclusivity policiesjava-broker-amqp-1-0-management
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1569102 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java')
-rw-r--r-- | java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java | 63 |
1 files changed, 16 insertions, 47 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 546cc79f9e..f7e2d2df50 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -30,6 +30,9 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -207,15 +210,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - queue = _vhost.createQueue( - UUIDGenerator.generateQueueUUID(name, _vhost.getName()), - name, - isDurable, - null, - true, - true, - true, - Collections.EMPTY_MAP); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName())); + attributes.put(Queue.NAME, name); + attributes.put(Queue.DURABLE, isDurable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK); + + queue = _vhost.createQueue(getSession(), attributes); } else { @@ -308,44 +310,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); - if(!isDurable) - { - final String queueName = name; - final AMQQueue tempQueue = queue; - - final Action<Connection_1_0> deleteQueueTask = - new Action<Connection_1_0>() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue", e); - } - } - } - }; - - getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); - } - - - }); - } - qd = new QueueDestination(queue); } catch (QpidSecurityException e) @@ -409,6 +373,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer"); throw new ConnectionScopedRuntimeException(e); } + catch (MessageSource.ConsumerAccessRefused e) + { + _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy"); + throw new ConnectionScopedRuntimeException(e); + } } } |