summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java')
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java63
1 files changed, 16 insertions, 47 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 546cc79f9e..f7e2d2df50 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -30,6 +30,9 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -207,15 +210,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(queue == null)
{
- queue = _vhost.createQueue(
- UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
- name,
- isDurable,
- null,
- true,
- true,
- true,
- Collections.EMPTY_MAP);
+ Map<String,Object> attributes = new HashMap<String,Object>();
+ attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName()));
+ attributes.put(Queue.NAME, name);
+ attributes.put(Queue.DURABLE, isDurable);
+ attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK);
+
+ queue = _vhost.createQueue(getSession(), attributes);
}
else
{
@@ -308,44 +310,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
exchange.addBinding(binding, queue,null);
source.setDistributionMode(StdDistMode.COPY);
- if(!isDurable)
- {
- final String queueName = name;
- final AMQQueue tempQueue = queue;
-
- final Action<Connection_1_0> deleteQueueTask =
- new Action<Connection_1_0>()
- {
- public void performAction(Connection_1_0 session)
- {
- if (_vhost.getQueue(queueName) == tempQueue)
- {
- try
- {
- _vhost.removeQueue(tempQueue);
- }
- catch (QpidSecurityException e)
- {
- //TODO
- _logger.error("Error removing queue", e);
- }
- }
- }
- };
-
- getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
- }
-
-
- });
- }
-
qd = new QueueDestination(queue);
}
catch (QpidSecurityException e)
@@ -409,6 +373,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer");
throw new ConnectionScopedRuntimeException(e);
}
+ catch (MessageSource.ConsumerAccessRefused e)
+ {
+ _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy");
+ throw new ConnectionScopedRuntimeException(e);
+ }
}
}