diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java')
-rw-r--r-- | java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java | 127 |
1 files changed, 31 insertions, 96 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 1bd50533ed..b0a60beaf5 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -25,6 +25,8 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.Exchange; @@ -204,47 +206,12 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + else if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } else { - if(queue.isExclusive()) - { - ServerSession s = (ServerSession) session; - queue.setExclusiveOwningSession(s); - - ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - if(queue.getExclusiveOwningSession() == session) - { - queue.setExclusiveOwningSession(null); - } - } - }); - - if(queue.getAuthorizationHolder() == null) - { - queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - if(queue.getAuthorizationHolder() == session) - { - queue.setAuthorizationHolder(null); - } - } - }); - } - } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); @@ -302,6 +269,10 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy"); + } } } } @@ -1197,7 +1168,7 @@ public class ServerSessionDelegate extends SessionDelegate exception(session, method, errorCode, description); } - else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + else if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1214,7 +1185,6 @@ public class ServerSessionDelegate extends SessionDelegate try { - String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null; final String alternateExchangeName = method.getAlternateExchange(); @@ -1227,66 +1197,36 @@ public class ServerSessionDelegate extends SessionDelegate final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); - final boolean deleteOnNoConsumer = !exclusive && autoDelete; + arguments.put(Queue.ID, id); + arguments.put(Queue.NAME, queueName); - queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner, - autoDelete, exclusive, deleteOnNoConsumer, - arguments); - - if (autoDelete && exclusive) + LifetimePolicy lifetime; + if(autoDelete) { - final AMQQueue q = queue; - final Action<ServerSession> deleteQueueTask = new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - try - { - virtualHost.removeQueue(q); - } - catch (QpidSecurityException e) - { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END + : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS; } - if (exclusive) + else { - final AMQQueue q = queue; - final Action<ServerSession> removeExclusive = new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(removeExclusive); - } - }); + lifetime = LifetimePolicy.PERMANENT; } + + arguments.put(Queue.LIFETIME_POLICY, lifetime); + + ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE; + + + arguments.put(Queue.DURABLE, method.getDurable()); + + arguments.put(Queue.EXCLUSIVE, exclusivityPolicy); + + queue = virtualHost.createQueue((ServerSession)session, arguments); + } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1347,11 +1287,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -1424,7 +1360,7 @@ public class ServerSessionDelegate extends SessionDelegate result.setQueue(queue.getName()); result.setDurable(queue.isDurable()); result.setExclusive(queue.isExclusive()); - result.setAutoDelete(queue.isAutoDelete()); + result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); Map<String, Object> arguments = new LinkedHashMap<String, Object>(); Collection<String> availableAttrs = queue.getAvailableAttributes(); @@ -1500,7 +1436,6 @@ public class ServerSessionDelegate extends SessionDelegate public void closed(Session session) { setThreadSubject(session); - ServerSession serverSession = (ServerSession)session; serverSession.stopSubscriptions(); |