diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java | 326 |
1 files changed, 173 insertions, 153 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 541810d2fe..73ec7f1231 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -20,31 +20,78 @@ */ package org.apache.qpid.server.transport; -import org.apache.qpid.transport.*; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.*; -import org.apache.qpid.server.queue.QueueRegistry; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeInUseException; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.flow.FlowCreditManager_0_10; +import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.message.MessageMetaData_0_10; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.message.MessageMetaData_0_10; -import org.apache.qpid.server.subscription.Subscription_0_10; -import org.apache.qpid.server.flow.*; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUnknownExchangeType; -import org.apache.qpid.framing.*; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.nio.ByteBuffer; +import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.Acquired; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.ExchangeBound; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeDeclare; +import org.apache.qpid.transport.ExchangeDelete; +import org.apache.qpid.transport.ExchangeQuery; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExchangeUnbind; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAccept; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquire; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCancel; +import org.apache.qpid.transport.MessageFlow; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageFlush; +import org.apache.qpid.transport.MessageReject; +import org.apache.qpid.transport.MessageRejectCode; +import org.apache.qpid.transport.MessageRelease; +import org.apache.qpid.transport.MessageResume; +import org.apache.qpid.transport.MessageSetFlowMode; +import org.apache.qpid.transport.MessageStop; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.QueueDelete; +import org.apache.qpid.transport.QueuePurge; +import org.apache.qpid.transport.QueueQuery; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.TxCommit; +import org.apache.qpid.transport.TxRollback; +import org.apache.qpid.transport.TxSelect; public class ServerSessionDelegate extends SessionDelegate { @@ -58,6 +105,8 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void command(Session session, Method method) { + SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID()); + super.command(session, method); if (method.isSync()) { @@ -317,7 +366,6 @@ public class ServerSessionDelegate extends SessionDelegate catch (AMQException e) { //TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } } @@ -371,19 +419,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if (!virtualHost.getAccessManager().authoriseCreateExchange((ServerSession)session, method.getAutoDelete(), - method.getDurable(), new AMQShortString(method.getExchange()), false, false, method.getPassive(), - new AMQShortString(method.getType()))) - { - - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; - String description = "permission denied: exchange-name '" + exchangeName + "'"; - - exception(session, method, errorCode, description); - - - } - else if(exchange == null) + if (exchange == null) { ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); @@ -417,12 +453,18 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); } + catch (AMQSecurityException e) + { + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; + String description = "Permission denied: exchange-name '" + exchangeName + "'"; + + exception(session, method, errorCode, description); + } catch (AMQException e) { //TODO throw new RuntimeException(e); } - } else { @@ -478,47 +520,38 @@ public class ServerSessionDelegate extends SessionDelegate VirtualHost virtualHost = getVirtualHost(session); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - //Perform ACLs - if (!virtualHost.getAccessManager().authoriseDelete((ServerSession)session, - exchangeRegistry.getExchange(method.getExchange()))) - { - exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied"); - - } - else + try { + Exchange exchange = getExchange(session, method.getExchange()); - try + if(exchange != null && exchange.hasReferrers()) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + } + else { - Exchange exchange = getExchange(session, method.getExchange()); + exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); - if(exchange != null && exchange.hasReferrers()) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); - } - else + if (exchange.isDurable() && !exchange.isAutoDelete()) { - exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); - - if (exchange.isDurable() && !exchange.isAutoDelete()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - store.removeExchange(exchange); - } - + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.removeExchange(exchange); } } - catch (ExchangeInUseException e) - { - exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } } - + catch (ExchangeInUseException e) + { + exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); + } + catch (AMQSecurityException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + method.getExchange()); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } } @Override @@ -582,13 +615,6 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); } - else if (!virtualHost.getAccessManager().authoriseBind((ServerSession)session, exchange, - queue, new AMQShortString(method.getBindingKey()))) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange() - + "' to Queue: '" + method.getQueue() - + "' not allowed"); - } else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) { exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header"); @@ -600,8 +626,15 @@ public class ServerSessionDelegate extends SessionDelegate if (!exchange.isBound(routingKey, fieldTable, queue)) { - virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments()); - + try + { + virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments()); + } + catch (AMQSecurityException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange() + + "' to Queue: '" + method.getQueue() + "' not allowed"); + } } else { @@ -649,7 +682,14 @@ public class ServerSessionDelegate extends SessionDelegate } else { - virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null); + try + { + virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null); + } + catch (AMQSecurityException e) + { + exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied"); + } } } @@ -768,25 +808,6 @@ public class ServerSessionDelegate extends SessionDelegate DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); String queueName = method.getQueue(); - - if (!method.getPassive()) - { - // Perform ACL if request is not passive - - if (!virtualHost.getAccessManager().authoriseCreateQueue(((ServerSession)session), method.getAutoDelete(), method.getDurable(), - method.getExclusive(), false, method.getPassive(), new AMQShortString(queueName))) - { - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; - String description = "permission denied: queue-name '" + queueName + "'"; - - exception(session, method, errorCode, description); - - // TODO control flow - return; - } - } - - AMQQueue queue; QueueRegistry queueRegistry = getQueueRegistry(session); //TODO: do we need to check that the queue already exists with exactly the same "configuration"? @@ -879,34 +900,32 @@ public class ServerSessionDelegate extends SessionDelegate { final AMQQueue q = queue; final ServerSession.Task deleteQueueTask = new ServerSession.Task() - { - - public void doTask(ServerSession session) { - try + public void doTask(ServerSession session) { - q.delete(); + try + { + q.delete(); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - }; + }; final ServerSession s = (ServerSession) session; s.addSessionCloseTask(deleteQueueTask); queue.addQueueDeleteTask(new AMQQueue.Task() - { - - public void doTask(AMQQueue queue) throws AMQException { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + public void doTask(AMQQueue queue) throws AMQException + { + s.removeSessionCloseTask(deleteQueueTask); + } + }); } else if(method.getExclusive()) { - { final AMQQueue q = queue; final ServerSession.Task removeExclusive = new ServerSession.Task() { @@ -928,31 +947,34 @@ public class ServerSessionDelegate extends SessionDelegate } }); } - } + } + catch (AMQSecurityException e) + { + String description = "Cannot declare queue('" + queueName + "'), permission denied"; + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; + exception(session, method, errorCode, description); } catch (AMQException e) { + // TODO throw new RuntimeException(e); } } } else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session))) { - String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " + "declared on another session"; ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; - + exception(session, method, errorCode, description); - + return; } - } } - protected AMQQueue createQueue(final String queueName, QueueDeclare body, VirtualHost virtualHost, @@ -963,15 +985,14 @@ public class ServerSessionDelegate extends SessionDelegate String owner = body.getExclusive() ? session.getClientID() : null; - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); - if (body.getExclusive() && !body.getDurable()) { final ServerSession.Task deleteQueueTask = new ServerSession.Task() - { + { public void doTask(ServerSession session) { if (registry.getQueue(queueName) == queue) @@ -1006,7 +1027,6 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void queueDelete(Session session, QueueDelete method) { - String queueName = method.getQueue(); if(queueName == null || queueName.length()==0) { @@ -1041,36 +1061,28 @@ public class ServerSessionDelegate extends SessionDelegate else { VirtualHost virtualHost = getVirtualHost(session); - - //Perform ACLs - if (!virtualHost.getAccessManager().authoriseDelete(((ServerSession)session), queue)) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot delete queue " + queueName); - } - else + + try { - try - { - int purged = queue.delete(); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - store.removeQueue(queue); - } - - } - catch (AMQException e) + queue.delete(); + if (queue.isDurable() && !queue.isAutoDelete()) { - //TODO - throw new RuntimeException(e); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.removeQueue(queue); } - } - + catch (AMQSecurityException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } } } } - } @Override @@ -1080,24 +1092,32 @@ public class ServerSessionDelegate extends SessionDelegate if(queueName == null || queueName.length()==0) { exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied"); - } else { AMQQueue queue = getQueue(session, queueName); - if (queue == null) { exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); } else { - //TODO - queue.clearQueue(); + try + { + queue.clearQueue(); + } + catch (AMQSecurityException e) + { + exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } } } - } @Override |