summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java326
1 files changed, 173 insertions, 153 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 541810d2fe..73ec7f1231 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -20,31 +20,78 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.*;
-import org.apache.qpid.server.queue.QueueRegistry;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
-import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.flow.*;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.*;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Acquired;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeBind;
+import org.apache.qpid.transport.ExchangeBound;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeDeclare;
+import org.apache.qpid.transport.ExchangeDelete;
+import org.apache.qpid.transport.ExchangeQuery;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExchangeUnbind;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAccept;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquire;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCancel;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageFlush;
+import org.apache.qpid.transport.MessageReject;
+import org.apache.qpid.transport.MessageRejectCode;
+import org.apache.qpid.transport.MessageRelease;
+import org.apache.qpid.transport.MessageResume;
+import org.apache.qpid.transport.MessageSetFlowMode;
+import org.apache.qpid.transport.MessageStop;
+import org.apache.qpid.transport.MessageSubscribe;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.QueueDeclare;
+import org.apache.qpid.transport.QueueDelete;
+import org.apache.qpid.transport.QueuePurge;
+import org.apache.qpid.transport.QueueQuery;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.TxCommit;
+import org.apache.qpid.transport.TxRollback;
+import org.apache.qpid.transport.TxSelect;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -58,6 +105,8 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void command(Session session, Method method)
{
+ SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID());
+
super.command(session, method);
if (method.isSync())
{
@@ -317,7 +366,6 @@ public class ServerSessionDelegate extends SessionDelegate
catch (AMQException e)
{
//TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -371,19 +419,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- if (!virtualHost.getAccessManager().authoriseCreateExchange((ServerSession)session, method.getAutoDelete(),
- method.getDurable(), new AMQShortString(method.getExchange()), false, false, method.getPassive(),
- new AMQShortString(method.getType())))
- {
-
- ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
- String description = "permission denied: exchange-name '" + exchangeName + "'";
-
- exception(session, method, errorCode, description);
-
-
- }
- else if(exchange == null)
+ if (exchange == null)
{
ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
@@ -417,12 +453,18 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
}
+ catch (AMQSecurityException e)
+ {
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
+ String description = "Permission denied: exchange-name '" + exchangeName + "'";
+
+ exception(session, method, errorCode, description);
+ }
catch (AMQException e)
{
//TODO
throw new RuntimeException(e);
}
-
}
else
{
@@ -478,47 +520,38 @@ public class ServerSessionDelegate extends SessionDelegate
VirtualHost virtualHost = getVirtualHost(session);
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- //Perform ACLs
- if (!virtualHost.getAccessManager().authoriseDelete((ServerSession)session,
- exchangeRegistry.getExchange(method.getExchange())))
- {
- exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied");
-
- }
- else
+ try
{
+ Exchange exchange = getExchange(session, method.getExchange());
- try
+ if(exchange != null && exchange.hasReferrers())
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ }
+ else
{
- Exchange exchange = getExchange(session, method.getExchange());
+ exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
- if(exchange != null && exchange.hasReferrers())
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
- }
- else
+ if (exchange.isDurable() && !exchange.isAutoDelete())
{
- exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
-
- if (exchange.isDurable() && !exchange.isAutoDelete())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- store.removeExchange(exchange);
- }
-
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeExchange(exchange);
}
}
- catch (ExchangeInUseException e)
- {
- exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use");
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
}
-
+ catch (ExchangeInUseException e)
+ {
+ exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use");
+ }
+ catch (AMQSecurityException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + method.getExchange());
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -582,13 +615,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
}
- else if (!virtualHost.getAccessManager().authoriseBind((ServerSession)session, exchange,
- queue, new AMQShortString(method.getBindingKey())))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange()
- + "' to Queue: '" + method.getQueue()
- + "' not allowed");
- }
else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
{
exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
@@ -600,8 +626,15 @@ public class ServerSessionDelegate extends SessionDelegate
if (!exchange.isBound(routingKey, fieldTable, queue))
{
- virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments());
-
+ try
+ {
+ virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments());
+ }
+ catch (AMQSecurityException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange()
+ + "' to Queue: '" + method.getQueue() + "' not allowed");
+ }
}
else
{
@@ -649,7 +682,14 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null);
+ try
+ {
+ virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null);
+ }
+ catch (AMQSecurityException e)
+ {
+ exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied");
+ }
}
}
@@ -768,25 +808,6 @@ public class ServerSessionDelegate extends SessionDelegate
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
String queueName = method.getQueue();
-
- if (!method.getPassive())
- {
- // Perform ACL if request is not passive
-
- if (!virtualHost.getAccessManager().authoriseCreateQueue(((ServerSession)session), method.getAutoDelete(), method.getDurable(),
- method.getExclusive(), false, method.getPassive(), new AMQShortString(queueName)))
- {
- ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
- String description = "permission denied: queue-name '" + queueName + "'";
-
- exception(session, method, errorCode, description);
-
- // TODO control flow
- return;
- }
- }
-
-
AMQQueue queue;
QueueRegistry queueRegistry = getQueueRegistry(session);
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
@@ -879,34 +900,32 @@ public class ServerSessionDelegate extends SessionDelegate
{
final AMQQueue q = queue;
final ServerSession.Task deleteQueueTask = new ServerSession.Task()
- {
-
- public void doTask(ServerSession session)
{
- try
+ public void doTask(ServerSession session)
{
- q.delete();
+ try
+ {
+ q.delete();
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
- };
+ };
final ServerSession s = (ServerSession) session;
s.addSessionCloseTask(deleteQueueTask);
queue.addQueueDeleteTask(new AMQQueue.Task()
- {
-
- public void doTask(AMQQueue queue) throws AMQException
{
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ s.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
}
else if(method.getExclusive())
{
- {
final AMQQueue q = queue;
final ServerSession.Task removeExclusive = new ServerSession.Task()
{
@@ -928,31 +947,34 @@ public class ServerSessionDelegate extends SessionDelegate
}
});
}
- }
+ }
+ catch (AMQSecurityException e)
+ {
+ String description = "Cannot declare queue('" + queueName + "'), permission denied";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
+ exception(session, method, errorCode, description);
}
catch (AMQException e)
{
+ // TODO
throw new RuntimeException(e);
}
}
}
else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
{
-
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
-
+
exception(session, method, errorCode, description);
-
+
return;
}
-
}
}
-
protected AMQQueue createQueue(final String queueName,
QueueDeclare body,
VirtualHost virtualHost,
@@ -963,15 +985,14 @@ public class ServerSessionDelegate extends SessionDelegate
String owner = body.getExclusive() ? session.getClientID() : null;
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
body.getExclusive(), virtualHost, body.getArguments());
-
if (body.getExclusive() && !body.getDurable())
{
final ServerSession.Task deleteQueueTask =
new ServerSession.Task()
- {
+ {
public void doTask(ServerSession session)
{
if (registry.getQueue(queueName) == queue)
@@ -1006,7 +1027,6 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void queueDelete(Session session, QueueDelete method)
{
-
String queueName = method.getQueue();
if(queueName == null || queueName.length()==0)
{
@@ -1041,36 +1061,28 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
VirtualHost virtualHost = getVirtualHost(session);
-
- //Perform ACLs
- if (!virtualHost.getAccessManager().authoriseDelete(((ServerSession)session), queue))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot delete queue " + queueName);
- }
- else
+
+ try
{
- try
- {
- int purged = queue.delete();
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- store.removeQueue(queue);
- }
-
- }
- catch (AMQException e)
+ queue.delete();
+ if (queue.isDurable() && !queue.isAutoDelete())
{
- //TODO
- throw new RuntimeException(e);
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeQueue(queue);
}
-
}
-
+ catch (AMQSecurityException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
}
}
}
-
}
@Override
@@ -1080,24 +1092,32 @@ public class ServerSessionDelegate extends SessionDelegate
if(queueName == null || queueName.length()==0)
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied");
-
}
else
{
AMQQueue queue = getQueue(session, queueName);
-
if (queue == null)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found");
}
else
{
- //TODO
- queue.clearQueue();
+ try
+ {
+ queue.clearQueue();
+ }
+ catch (AMQSecurityException e)
+ {
+ exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
}
}
-
}
@Override