diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol')
4 files changed, 90 insertions, 122 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index ff7ce0a79d..9c012eb782 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -25,6 +25,7 @@ import java.security.Principal; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; @@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; @@ -56,7 +58,8 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; -public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder +public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>, + LogSubject, AuthorizationHolder { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); @@ -72,6 +75,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; + + private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = + new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -197,7 +204,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask = task; } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(ServerSession session, AMQConstant cause, String message) { ExecutionException ex = new ExecutionException(); ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; @@ -211,7 +218,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } ex.setErrorCode(code); ex.setDescription(message); - ((ServerSession)session).invoke(ex); + session.invoke(ex); session.close(cause, message); } @@ -315,6 +322,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) { closeSubscriptions(); + performDeleteTasks(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { @@ -327,6 +335,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel, close(replyCode, message); } + protected void performDeleteTasks() + { + for(Action<? super ServerConnection> task : _taskList) + { + task.performAction(this); + } + } + public synchronized void block() { if(!_blocking) @@ -367,12 +383,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.removeSession(ssn); } - public List<AMQSessionModel> getSessionModels() + public List<ServerSession> getSessionModels() { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + List<ServerSession> sessions = new ArrayList<ServerSession>(); for (Session ssn : getChannels()) { - sessions.add((AMQSessionModel) ssn); + sessions.add((ServerSession) ssn); } return sessions; } @@ -475,14 +491,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return String.valueOf(getRemoteAddress()); } - public String getUserName() - { - return _authorizedPrincipal.getName(); - } - @Override public void closed() { + performDeleteTasks(); closeSubscriptions(); super.closed(); } @@ -522,6 +534,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } @Override + public String getRemoteContainerName() + { + return getConnectionDelegate().getClientId(); + } + + @Override public String getClientVersion() { return getConnectionDelegate().getClientVersion(); @@ -533,11 +551,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return getConnectionDelegate().getClientProduct(); } - public String getPrincipalAsString() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - public long getSessionCountLimit() { return getChannelMax(); @@ -565,4 +578,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.doHeartBeat(); } + + @Override + public void addDeleteTask(final Action<? super ServerConnection> task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action<? super ServerConnection> task) + { + _taskList.remove(task); + } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index c85a415ce5..dc26249c61 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.security.Principal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -310,14 +311,18 @@ public class ServerConnectionDelegate extends ServerDelegate private boolean isSessionNameUnique(final byte[] name, final Connection conn) { final ServerConnection sconn = (ServerConnection) conn; - final String userId = sconn.getUserName(); + final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal(); + final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName(); final Iterator<AMQConnectionModel> connections = ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator(); while(connections.hasNext()) { - final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next(); - if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name)) + final AMQConnectionModel amqConnectionModel = connections.next(); + final String userName = amqConnectionModel.getAuthorizedPrincipal() == null + ? "" + : amqConnectionModel.getAuthorizedPrincipal().getName(); + if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name)) { return false; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 0e6b4d3b08..29f9fc549e 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -78,6 +78,7 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -88,7 +89,9 @@ import static org.apache.qpid.util.Serial.gt; public class ServerSession extends Session implements AuthorizationHolder, - AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder + AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder, + Deletable<ServerSession> + { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); @@ -132,7 +135,7 @@ public class ServerSession extends Session private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>(); - private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>(); + private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>(); private final TransactionTimeoutHelper _transactionTimeoutHelper; @@ -374,7 +377,7 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - for (Action<ServerSession> task : _taskList) + for (Action<? super ServerSession> task : _taskList) { task.performAction(this); } @@ -610,12 +613,12 @@ public class ServerSession extends Session return getConnection().getAuthorizedSubject(); } - public void addSessionCloseTask(Action<ServerSession> task) + public void addDeleteTask(Action<? super ServerSession> task) { _taskList.add(task); } - public void removeSessionCloseTask(Action<ServerSession> task) + public void removeDeleteTask(Action<? super ServerSession> task) { _taskList.remove(task); } @@ -652,7 +655,7 @@ public class ServerSession extends Session return _id; } - public AMQConnectionModel getConnectionModel() + public ServerConnection getConnectionModel() { return getConnection(); } @@ -922,7 +925,7 @@ public class ServerSession extends Session } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(ServerSession o) { return getId().compareTo(o.getId()); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 1bd50533ed..b0a60beaf5 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -25,6 +25,8 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.Exchange; @@ -204,47 +206,12 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + else if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } else { - if(queue.isExclusive()) - { - ServerSession s = (ServerSession) session; - queue.setExclusiveOwningSession(s); - - ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - if(queue.getExclusiveOwningSession() == session) - { - queue.setExclusiveOwningSession(null); - } - } - }); - - if(queue.getAuthorizationHolder() == null) - { - queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - if(queue.getAuthorizationHolder() == session) - { - queue.setAuthorizationHolder(null); - } - } - }); - } - } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); @@ -302,6 +269,10 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy"); + } } } } @@ -1197,7 +1168,7 @@ public class ServerSessionDelegate extends SessionDelegate exception(session, method, errorCode, description); } - else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + else if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1214,7 +1185,6 @@ public class ServerSessionDelegate extends SessionDelegate try { - String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null; final String alternateExchangeName = method.getAlternateExchange(); @@ -1227,66 +1197,36 @@ public class ServerSessionDelegate extends SessionDelegate final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); - final boolean deleteOnNoConsumer = !exclusive && autoDelete; + arguments.put(Queue.ID, id); + arguments.put(Queue.NAME, queueName); - queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner, - autoDelete, exclusive, deleteOnNoConsumer, - arguments); - - if (autoDelete && exclusive) + LifetimePolicy lifetime; + if(autoDelete) { - final AMQQueue q = queue; - final Action<ServerSession> deleteQueueTask = new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - try - { - virtualHost.removeQueue(q); - } - catch (QpidSecurityException e) - { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END + : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS; } - if (exclusive) + else { - final AMQQueue q = queue; - final Action<ServerSession> removeExclusive = new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(removeExclusive); - } - }); + lifetime = LifetimePolicy.PERMANENT; } + + arguments.put(Queue.LIFETIME_POLICY, lifetime); + + ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE; + + + arguments.put(Queue.DURABLE, method.getDurable()); + + arguments.put(Queue.EXCLUSIVE, exclusivityPolicy); + + queue = virtualHost.createQueue((ServerSession)session, arguments); + } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1347,11 +1287,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -1424,7 +1360,7 @@ public class ServerSessionDelegate extends SessionDelegate result.setQueue(queue.getName()); result.setDurable(queue.isDurable()); result.setExclusive(queue.isExclusive()); - result.setAutoDelete(queue.isAutoDelete()); + result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); Map<String, Object> arguments = new LinkedHashMap<String, Object>(); Collection<String> availableAttrs = queue.getAvailableAttributes(); @@ -1500,7 +1436,6 @@ public class ServerSessionDelegate extends SessionDelegate public void closed(Session session) { setThreadSubject(session); - ServerSession serverSession = (ServerSession)session; serverSession.stopSubscriptions(); |