summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java')
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java127
1 files changed, 31 insertions, 96 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 1bd50533ed..b0a60beaf5 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.Exchange;
@@ -204,47 +206,12 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ else if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
- if(queue.isExclusive())
- {
- ServerSession s = (ServerSession) session;
- queue.setExclusiveOwningSession(s);
-
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getExclusiveOwningSession() == session)
- {
- queue.setExclusiveOwningSession(null);
- }
- }
- });
-
- if(queue.getAuthorizationHolder() == null)
- {
- queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getAuthorizationHolder() == session)
- {
- queue.setAuthorizationHolder(null);
- }
- }
- });
- }
- }
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -302,6 +269,10 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
+ }
}
}
}
@@ -1197,7 +1168,7 @@ public class ServerSessionDelegate extends SessionDelegate
exception(session, method, errorCode, description);
}
- else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ else if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1214,7 +1185,6 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
- String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
final String alternateExchangeName = method.getAlternateExchange();
@@ -1227,66 +1197,36 @@ public class ServerSessionDelegate extends SessionDelegate
final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
- final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+ arguments.put(Queue.ID, id);
+ arguments.put(Queue.NAME, queueName);
- queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
- autoDelete, exclusive, deleteOnNoConsumer,
- arguments);
-
- if (autoDelete && exclusive)
+ LifetimePolicy lifetime;
+ if(autoDelete)
{
- final AMQQueue q = queue;
- final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- try
- {
- virtualHost.removeQueue(q);
- }
- catch (QpidSecurityException e)
- {
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
- }
- }
- };
- final ServerSession s = (ServerSession) session;
- s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+ : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
}
- if (exclusive)
+ else
{
- final AMQQueue q = queue;
- final Action<ServerSession> removeExclusive = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- q.setAuthorizationHolder(null);
- q.setExclusiveOwningSession(null);
- }
- };
- final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
- s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(removeExclusive);
- }
- });
+ lifetime = LifetimePolicy.PERMANENT;
}
+
+ arguments.put(Queue.LIFETIME_POLICY, lifetime);
+
+ ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+
+
+ arguments.put(Queue.DURABLE, method.getDurable());
+
+ arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+
+ queue = virtualHost.createQueue((ServerSession)session, arguments);
+
}
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1347,11 +1287,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -1424,7 +1360,7 @@ public class ServerSessionDelegate extends SessionDelegate
result.setQueue(queue.getName());
result.setDurable(queue.isDurable());
result.setExclusive(queue.isExclusive());
- result.setAutoDelete(queue.isAutoDelete());
+ result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
Map<String, Object> arguments = new LinkedHashMap<String, Object>();
Collection<String> availableAttrs = queue.getAvailableAttributes();
@@ -1500,7 +1436,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void closed(Session session)
{
setThreadSubject(session);
-
ServerSession serverSession = (ServerSession)session;
serverSession.stopSubscriptions();