diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java | 96 |
1 files changed, 34 insertions, 62 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index f316d60c6a..42a3975e24 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -25,23 +25,21 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeInUseException; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeType; -import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.exchange.*; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; @@ -97,7 +95,6 @@ import org.apache.qpid.transport.TxSelect; public class ServerSessionDelegate extends SessionDelegate { - private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); private final IApplicationRegistry _appRegistry; public ServerSessionDelegate(IApplicationRegistry appRegistry) @@ -108,24 +105,16 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void command(Session session, Method method) { - try - { - setThreadSubject(session); + SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID()); - if(!session.isClosing()) + if(!session.isClosing()) + { + super.command(session, method); + if (method.isSync()) { - super.command(session, method); - if (method.isSync()) - { - session.flushProcessed(); - } + session.flushProcessed(); } } - catch(RuntimeException e) - { - LOGGER.error("Exception processing command", e); - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e); - } } @Override @@ -134,6 +123,8 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).accept(method.getTransfers()); } + + @Override public void messageReject(Session session, MessageReject method) { @@ -212,33 +203,32 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) + else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } else { + if(queue.isExclusive()) { - ServerSession s = (ServerSession) session; - queue.setExclusiveOwningSession(s); - if(queue.getAuthorizationHolder() == null) + if(queue.getPrincipalHolder() == null) { - queue.setAuthorizationHolder(s); - queue.setExclusiveOwningSession(s); + queue.setPrincipalHolder((ServerSession)session); ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() { + public void doTask(ServerSession session) { - if(queue.getAuthorizationHolder() == session) + if(queue.getPrincipalHolder() == session) { - queue.setAuthorizationHolder(null); - queue.setExclusiveOwningSession(null); + queue.setPrincipalHolder(null); } } }); } + } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); @@ -379,6 +369,7 @@ public class ServerSessionDelegate extends SessionDelegate } ssn.processed(xfr); + } @Override @@ -398,7 +389,7 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).unregister(sub); if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) { - queue.setAuthorizationHolder(null); + queue.setPrincipalHolder(null); } } } @@ -457,19 +448,6 @@ public class ServerSessionDelegate extends SessionDelegate VirtualHost virtualHost = getVirtualHost(session); Exchange exchange = getExchange(session, exchangeName); - //we must check for any unsupported arguments present and throw not-implemented - if(method.hasArguments()) - { - Map<String,Object> args = method.getArguments(); - - //QPID-3392: currently we don't support any! - if(!args.isEmpty()) - { - exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString()); - return; - } - } - if(method.getPassive()) { if(exchange == null) @@ -479,6 +457,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { + // TODO - check exchange has same properties if(!exchange.getTypeShortString().toString().equals(method.getType())) { exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); @@ -990,10 +969,10 @@ public class ServerSessionDelegate extends SessionDelegate } - if (method.hasAutoDelete() - && method.getAutoDelete() - && method.hasExclusive() - && method.getExclusive()) + if(method.hasAutoDelete() + && method.getAutoDelete() + && method.hasExclusive() + && method.getExclusive()) { final AMQQueue q = queue; final ServerSession.Task deleteQueueTask = new ServerSession.Task() @@ -1020,23 +999,23 @@ public class ServerSessionDelegate extends SessionDelegate } }); } - if (method.hasExclusive() - && method.getExclusive()) + else if(method.getExclusive()) { final AMQQueue q = queue; final ServerSession.Task removeExclusive = new ServerSession.Task() { + public void doTask(ServerSession session) { - q.setAuthorizationHolder(null); + q.setPrincipalHolder(null); q.setExclusiveOwningSession(null); } }; final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); s.addSessionCloseTask(removeExclusive); queue.addQueueDeleteTask(new AMQQueue.Task() { + public void doTask(AMQQueue queue) throws AMQException { s.removeSessionCloseTask(removeExclusive); @@ -1050,7 +1029,7 @@ public class ServerSessionDelegate extends SessionDelegate } } } - else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session))) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1098,7 +1077,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) + if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -1244,8 +1223,6 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void closed(Session session) { - setThreadSubject(session); - for(Subscription_0_10 sub : getSubscriptions(session)) { ((ServerSession)session).unregister(sub); @@ -1264,9 +1241,4 @@ public class ServerSessionDelegate extends SessionDelegate return ((ServerSession)session).getSubscriptions(); } - private void setThreadSubject(Session session) - { - final ServerConnection scon = (ServerConnection) session.getConnection(); - SecurityManager.setThreadSubject(scon.getAuthorizedSubject()); - } } |