summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java96
1 files changed, 34 insertions, 62 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index f316d60c6a..42a3975e24 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -25,23 +25,21 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.exchange.*;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
@@ -97,7 +95,6 @@ import org.apache.qpid.transport.TxSelect;
public class ServerSessionDelegate extends SessionDelegate
{
- private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
private final IApplicationRegistry _appRegistry;
public ServerSessionDelegate(IApplicationRegistry appRegistry)
@@ -108,24 +105,16 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void command(Session session, Method method)
{
- try
- {
- setThreadSubject(session);
+ SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID());
- if(!session.isClosing())
+ if(!session.isClosing())
+ {
+ super.command(session, method);
+ if (method.isSync())
{
- super.command(session, method);
- if (method.isSync())
- {
- session.flushProcessed();
- }
+ session.flushProcessed();
}
}
- catch(RuntimeException e)
- {
- LOGGER.error("Exception processing command", e);
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e);
- }
}
@Override
@@ -134,6 +123,8 @@ public class ServerSessionDelegate extends SessionDelegate
((ServerSession)session).accept(method.getTransfers());
}
+
+
@Override
public void messageReject(Session session, MessageReject method)
{
@@ -212,33 +203,32 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
+ else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
+
if(queue.isExclusive())
{
- ServerSession s = (ServerSession) session;
- queue.setExclusiveOwningSession(s);
- if(queue.getAuthorizationHolder() == null)
+ if(queue.getPrincipalHolder() == null)
{
- queue.setAuthorizationHolder(s);
- queue.setExclusiveOwningSession(s);
+ queue.setPrincipalHolder((ServerSession)session);
((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
{
+
public void doTask(ServerSession session)
{
- if(queue.getAuthorizationHolder() == session)
+ if(queue.getPrincipalHolder() == session)
{
- queue.setAuthorizationHolder(null);
- queue.setExclusiveOwningSession(null);
+ queue.setPrincipalHolder(null);
}
}
});
}
+
}
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -379,6 +369,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
ssn.processed(xfr);
+
}
@Override
@@ -398,7 +389,7 @@ public class ServerSessionDelegate extends SessionDelegate
((ServerSession)session).unregister(sub);
if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
{
- queue.setAuthorizationHolder(null);
+ queue.setPrincipalHolder(null);
}
}
}
@@ -457,19 +448,6 @@ public class ServerSessionDelegate extends SessionDelegate
VirtualHost virtualHost = getVirtualHost(session);
Exchange exchange = getExchange(session, exchangeName);
- //we must check for any unsupported arguments present and throw not-implemented
- if(method.hasArguments())
- {
- Map<String,Object> args = method.getArguments();
-
- //QPID-3392: currently we don't support any!
- if(!args.isEmpty())
- {
- exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString());
- return;
- }
- }
-
if(method.getPassive())
{
if(exchange == null)
@@ -479,6 +457,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
+ // TODO - check exchange has same properties
if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
@@ -990,10 +969,10 @@ public class ServerSessionDelegate extends SessionDelegate
}
- if (method.hasAutoDelete()
- && method.getAutoDelete()
- && method.hasExclusive()
- && method.getExclusive())
+ if(method.hasAutoDelete()
+ && method.getAutoDelete()
+ && method.hasExclusive()
+ && method.getExclusive())
{
final AMQQueue q = queue;
final ServerSession.Task deleteQueueTask = new ServerSession.Task()
@@ -1020,23 +999,23 @@ public class ServerSessionDelegate extends SessionDelegate
}
});
}
- if (method.hasExclusive()
- && method.getExclusive())
+ else if(method.getExclusive())
{
final AMQQueue q = queue;
final ServerSession.Task removeExclusive = new ServerSession.Task()
{
+
public void doTask(ServerSession session)
{
- q.setAuthorizationHolder(null);
+ q.setPrincipalHolder(null);
q.setExclusiveOwningSession(null);
}
};
final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
queue.addQueueDeleteTask(new AMQQueue.Task()
{
+
public void doTask(AMQQueue queue) throws AMQException
{
s.removeSessionCloseTask(removeExclusive);
@@ -1050,7 +1029,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
}
- else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1098,7 +1077,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
+ if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -1244,8 +1223,6 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void closed(Session session)
{
- setThreadSubject(session);
-
for(Subscription_0_10 sub : getSubscriptions(session))
{
((ServerSession)session).unregister(sub);
@@ -1264,9 +1241,4 @@ public class ServerSessionDelegate extends SessionDelegate
return ((ServerSession)session).getSubscriptions();
}
- private void setThreadSubject(Session session)
- {
- final ServerConnection scon = (ServerConnection) session.getConnection();
- SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
- }
}