diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java | 114 |
1 files changed, 34 insertions, 80 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 5b5525643c..215e3f2f23 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -96,9 +98,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else { - AMQSessionModel owningSession = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() - && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); @@ -114,42 +114,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar try { - queue = createQueue(queueName, body, virtualHost, protocolConnection); - queue.setAuthorizationHolder(protocolConnection); - - if (body.getExclusive()) - { - queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); - queue.setAuthorizationHolder(protocolConnection); - - if(!body.getDurable()) - { - final AMQQueue q = queue; - final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) - { - q.setExclusiveOwningSession(null); - } - }; - protocolConnection.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void performAction(AMQQueue queue) - { - protocolConnection.removeSessionCloseTask(sessionCloseTask); - } - }); - } - } + queue = createQueue(channel, queueName, body, virtualHost, protocolConnection); } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - AMQSessionModel owningSession = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); @@ -161,19 +134,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " + queue.isExclusive() + " requested " + body.getExclusive() + ")"); } - else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection))) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " - + "as exclusive queue with same name " - + "declared on another client ID('" - + queue.getOwner() + "') your clientID('" + session.getClientID() + "')"); - - } - else if(queue.isAutoDelete() != body.getAutoDelete()) + else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) + || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: " - + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); + "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " + + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")"); } else if(queue.isDurable() != body.getDurable()) { @@ -211,7 +177,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return new AMQShortString("tmp_" + UUID.randomUUID()); } - protected AMQQueue createQueue(final AMQShortString queueName, + protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName, QueueDeclareBody body, final VirtualHost virtualHost, final AMQProtocolSession session) @@ -222,48 +188,36 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final boolean autoDelete = body.getAutoDelete(); final boolean exclusive = body.getExclusive(); - String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null; - Map<String, Object> arguments = + Map<String, Object> attributes = QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); - String queueNameString = AMQShortString.toString(queueName); - final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()); + final String queueNameString = AMQShortString.toString(queueName); + attributes.put(Queue.NAME, queueNameString); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName())); + attributes.put(Queue.DURABLE, durable); - final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete, - exclusive, autoDelete, arguments); + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; - if (exclusive && !durable) + if(exclusive) { - final AMQProtocolSession.Task deleteQueueTask = - new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) - { - if (virtualHost.getQueue(queueName.toString()) == queue) - { - try - { - virtualHost.removeQueue(queue); - } - catch (QpidSecurityException e) - { - throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e); - } - } - } - }; - - session.addSessionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - session.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetimePolicy = autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } + else + { + lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; } + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + + + final AMQQueue queue = virtualHost.createQueue(channel, attributes); + return queue; } } |