summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java114
1 files changed, 34 insertions, 80 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 5b5525643c..215e3f2f23 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -96,9 +98,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else
{
- AMQSessionModel owningSession = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable()
- && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -114,42 +114,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
try
{
- queue = createQueue(queueName, body, virtualHost, protocolConnection);
- queue.setAuthorizationHolder(protocolConnection);
-
- if (body.getExclusive())
- {
- queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
- queue.setAuthorizationHolder(protocolConnection);
-
- if(!body.getDurable())
- {
- final AMQQueue q = queue;
- final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
- {
- public void doTask(AMQProtocolSession session)
- {
- q.setExclusiveOwningSession(null);
- }
- };
- protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new Action<AMQQueue>() {
- public void performAction(AMQQueue queue)
- {
- protocolConnection.removeSessionCloseTask(sessionCloseTask);
- }
- });
- }
- }
+ queue = createQueue(channel, queueName, body, virtualHost, protocolConnection);
}
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- AMQSessionModel owningSession = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -161,19 +134,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
"Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
+ queue.isExclusive() + " requested " + body.getExclusive() + ")");
}
- else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
- + "as exclusive queue with same name "
- + "declared on another client ID('"
- + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
-
- }
- else if(queue.isAutoDelete() != body.getAutoDelete())
+ else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: "
- + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")");
}
else if(queue.isDurable() != body.getDurable())
{
@@ -211,7 +177,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
return new AMQShortString("tmp_" + UUID.randomUUID());
}
- protected AMQQueue createQueue(final AMQShortString queueName,
+ protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
QueueDeclareBody body,
final VirtualHost virtualHost,
final AMQProtocolSession session)
@@ -222,48 +188,36 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
final boolean autoDelete = body.getAutoDelete();
final boolean exclusive = body.getExclusive();
- String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null;
- Map<String, Object> arguments =
+ Map<String, Object> attributes =
QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
- String queueNameString = AMQShortString.toString(queueName);
- final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName());
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()));
+ attributes.put(Queue.DURABLE, durable);
- final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete,
- exclusive, autoDelete, arguments);
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
- if (exclusive && !durable)
+ if(exclusive)
{
- final AMQProtocolSession.Task deleteQueueTask =
- new AMQProtocolSession.Task()
- {
- public void doTask(AMQProtocolSession session)
- {
- if (virtualHost.getQueue(queueName.toString()) == queue)
- {
- try
- {
- virtualHost.removeQueue(queue);
- }
- catch (QpidSecurityException e)
- {
- throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e);
- }
- }
- }
- };
-
- session.addSessionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- session.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
}
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ final AMQQueue queue = virtualHost.createQueue(channel, attributes);
+
return queue;
}
}