diff options
Diffstat (limited to 'java/broker-plugins')
23 files changed, 475 insertions, 645 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index ff7ce0a79d..9c012eb782 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -25,6 +25,7 @@ import java.security.Principal; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; @@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; @@ -56,7 +58,8 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; -public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder +public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>, + LogSubject, AuthorizationHolder { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); @@ -72,6 +75,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; + + private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = + new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -197,7 +204,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask = task; } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(ServerSession session, AMQConstant cause, String message) { ExecutionException ex = new ExecutionException(); ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; @@ -211,7 +218,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } ex.setErrorCode(code); ex.setDescription(message); - ((ServerSession)session).invoke(ex); + session.invoke(ex); session.close(cause, message); } @@ -315,6 +322,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) { closeSubscriptions(); + performDeleteTasks(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { @@ -327,6 +335,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel, close(replyCode, message); } + protected void performDeleteTasks() + { + for(Action<? super ServerConnection> task : _taskList) + { + task.performAction(this); + } + } + public synchronized void block() { if(!_blocking) @@ -367,12 +383,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.removeSession(ssn); } - public List<AMQSessionModel> getSessionModels() + public List<ServerSession> getSessionModels() { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + List<ServerSession> sessions = new ArrayList<ServerSession>(); for (Session ssn : getChannels()) { - sessions.add((AMQSessionModel) ssn); + sessions.add((ServerSession) ssn); } return sessions; } @@ -475,14 +491,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return String.valueOf(getRemoteAddress()); } - public String getUserName() - { - return _authorizedPrincipal.getName(); - } - @Override public void closed() { + performDeleteTasks(); closeSubscriptions(); super.closed(); } @@ -522,6 +534,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } @Override + public String getRemoteContainerName() + { + return getConnectionDelegate().getClientId(); + } + + @Override public String getClientVersion() { return getConnectionDelegate().getClientVersion(); @@ -533,11 +551,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return getConnectionDelegate().getClientProduct(); } - public String getPrincipalAsString() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - public long getSessionCountLimit() { return getChannelMax(); @@ -565,4 +578,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.doHeartBeat(); } + + @Override + public void addDeleteTask(final Action<? super ServerConnection> task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action<? super ServerConnection> task) + { + _taskList.remove(task); + } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index c85a415ce5..dc26249c61 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.security.Principal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -310,14 +311,18 @@ public class ServerConnectionDelegate extends ServerDelegate private boolean isSessionNameUnique(final byte[] name, final Connection conn) { final ServerConnection sconn = (ServerConnection) conn; - final String userId = sconn.getUserName(); + final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal(); + final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName(); final Iterator<AMQConnectionModel> connections = ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator(); while(connections.hasNext()) { - final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next(); - if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name)) + final AMQConnectionModel amqConnectionModel = connections.next(); + final String userName = amqConnectionModel.getAuthorizedPrincipal() == null + ? "" + : amqConnectionModel.getAuthorizedPrincipal().getName(); + if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name)) { return false; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 0e6b4d3b08..29f9fc549e 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -78,6 +78,7 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -88,7 +89,9 @@ import static org.apache.qpid.util.Serial.gt; public class ServerSession extends Session implements AuthorizationHolder, - AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder + AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder, + Deletable<ServerSession> + { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); @@ -132,7 +135,7 @@ public class ServerSession extends Session private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>(); - private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>(); + private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>(); private final TransactionTimeoutHelper _transactionTimeoutHelper; @@ -374,7 +377,7 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - for (Action<ServerSession> task : _taskList) + for (Action<? super ServerSession> task : _taskList) { task.performAction(this); } @@ -610,12 +613,12 @@ public class ServerSession extends Session return getConnection().getAuthorizedSubject(); } - public void addSessionCloseTask(Action<ServerSession> task) + public void addDeleteTask(Action<? super ServerSession> task) { _taskList.add(task); } - public void removeSessionCloseTask(Action<ServerSession> task) + public void removeDeleteTask(Action<? super ServerSession> task) { _taskList.remove(task); } @@ -652,7 +655,7 @@ public class ServerSession extends Session return _id; } - public AMQConnectionModel getConnectionModel() + public ServerConnection getConnectionModel() { return getConnection(); } @@ -922,7 +925,7 @@ public class ServerSession extends Session } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(ServerSession o) { return getId().compareTo(o.getId()); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 1bd50533ed..b0a60beaf5 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -25,6 +25,8 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.Exchange; @@ -204,47 +206,12 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + else if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } else { - if(queue.isExclusive()) - { - ServerSession s = (ServerSession) session; - queue.setExclusiveOwningSession(s); - - ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - if(queue.getExclusiveOwningSession() == session) - { - queue.setExclusiveOwningSession(null); - } - } - }); - - if(queue.getAuthorizationHolder() == null) - { - queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - if(queue.getAuthorizationHolder() == session) - { - queue.setAuthorizationHolder(null); - } - } - }); - } - } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); @@ -302,6 +269,10 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy"); + } } } } @@ -1197,7 +1168,7 @@ public class ServerSessionDelegate extends SessionDelegate exception(session, method, errorCode, description); } - else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + else if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1214,7 +1185,6 @@ public class ServerSessionDelegate extends SessionDelegate try { - String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null; final String alternateExchangeName = method.getAlternateExchange(); @@ -1227,66 +1197,36 @@ public class ServerSessionDelegate extends SessionDelegate final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); - final boolean deleteOnNoConsumer = !exclusive && autoDelete; + arguments.put(Queue.ID, id); + arguments.put(Queue.NAME, queueName); - queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner, - autoDelete, exclusive, deleteOnNoConsumer, - arguments); - - if (autoDelete && exclusive) + LifetimePolicy lifetime; + if(autoDelete) { - final AMQQueue q = queue; - final Action<ServerSession> deleteQueueTask = new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - try - { - virtualHost.removeQueue(q); - } - catch (QpidSecurityException e) - { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END + : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS; } - if (exclusive) + else { - final AMQQueue q = queue; - final Action<ServerSession> removeExclusive = new Action<ServerSession>() - { - public void performAction(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(removeExclusive); - } - }); + lifetime = LifetimePolicy.PERMANENT; } + + arguments.put(Queue.LIFETIME_POLICY, lifetime); + + ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE; + + + arguments.put(Queue.DURABLE, method.getDurable()); + + arguments.put(Queue.EXCLUSIVE, exclusivityPolicy); + + queue = virtualHost.createQueue((ServerSession)session, arguments); + } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1347,11 +1287,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -1424,7 +1360,7 @@ public class ServerSessionDelegate extends SessionDelegate result.setQueue(queue.getName()); result.setDurable(queue.isDurable()); result.setExclusive(queue.isExclusive()); - result.setAutoDelete(queue.isAutoDelete()); + result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); Map<String, Object> arguments = new LinkedHashMap<String, Object>(); Collection<String> availableAttrs = queue.getAvailableAttributes(); @@ -1500,7 +1436,6 @@ public class ServerSessionDelegate extends SessionDelegate public void closed(Session session) { setThreadSubject(session); - ServerSession serverSession = (ServerSession)session; serverSession.stopSubscriptions(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b15b3f0bfa..fe1cb624e5 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -30,6 +31,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; @@ -86,7 +89,9 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder +public class AMQChannel<T extends AMQProtocolSession<T>> + implements AMQSessionModel<AMQChannel<T>,T>, + AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -140,7 +145,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AMQProtocolSession _session; + private final T _session; private AtomicBoolean _closing = new AtomicBoolean(false); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -163,12 +168,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); + private final List<Action<? super AMQChannel<T>>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>(); + private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + public AMQChannel(T session, int channelId, MessageStore messageStore) throws AMQException { _session = session; @@ -526,7 +534,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException + MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, + MessageSource.ConsumerAccessRefused { if (tag == null) { @@ -580,7 +589,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { filterManager = new SimpleFilterManager(); } - filterManager.add(new FilterSupport.NoLocalFilter(source)); + final Object connectionReference = getConnectionReference(); + filterManager.add(new MessageFilter() + { + @Override + public boolean matches(final Filterable message) + { + return message.getConnectionReference() != connectionReference; + } + }); } Consumer sub = source.addConsumer(target, @@ -609,6 +626,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _tag2SubscriptionTargetMap.remove(tag); throw e; } + catch (MessageSource.ConsumerAccessRefused e) + { + _tag2SubscriptionTargetMap.remove(tag); + throw e; + } return tag; } @@ -657,6 +679,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F CurrentActor.get().message(_logSubject, operationalLogMessage); unsubscribeAllConsumers(); + + for (Action<? super AMQChannel<T>> task : _taskList) + { + task.performAction(this); + } + + _transaction.rollback(); try @@ -692,9 +721,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F Consumer sub = me.getValue().getConsumer(); - - sub.close(); - + if(sub != null) + { + sub.close(); + } } _tag2SubscriptionTargetMap.clear(); @@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _id; } - public AMQConnectionModel getConnectionModel() + @Override + public T getConnectionModel() { return _session; } @@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(AMQChannel o) { return getId().compareTo(o.getId()); } + @Override + public void addDeleteTask(final Action<? super AMQChannel<T>> task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action<? super AMQChannel<T>> task) + { + _taskList.remove(task); + } + private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>> { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 5e95701e5a..68f1ad7942 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; -import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogActor; @@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -86,7 +83,7 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine> { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -103,9 +100,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private VirtualHost _virtualHost; - private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = + new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); - private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + @SuppressWarnings("unchecked") + private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; /** * The channels that the latest call to {@link #received(ByteBuffer)} applied to. @@ -114,9 +113,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>(); - - private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); + private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = + new HashSet<AMQChannel<AMQProtocolEngine>>(); private final AMQStateManager _stateManager; @@ -124,10 +122,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private SaslServer _saslServer; - private Object _lastReceived; - - private Object _lastSent; - private volatile boolean _closed; // maximum number of channels this session should have @@ -136,8 +130,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); - private FieldTable _clientProperties; - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<? super AMQProtocolEngine>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>(); private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); private ProtocolOutputConverter _protocolOutputConverter; @@ -153,12 +147,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _lastIoTime; private long _writtenBytes; - private long _readBytes; - private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); - private long _createTime = System.currentTimeMillis(); private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; @@ -176,6 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; + private long _readBytes; public AMQProtocolEngine(Broker broker, NetworkConnection network, @@ -258,15 +250,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final long arrivalTime = System.currentTimeMillis(); _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; + _readBytes += msg.remaining(); _receivedLock.lock(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - final int len = dataBlocks.size(); - for (int i = 0; i < len; i++) + for (AMQDataBlock dataBlock : dataBlocks) { - AMQDataBlock dataBlock = dataBlocks.get(i); try { dataBlockReceived(dataBlock); @@ -316,7 +307,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void receivedComplete() { - for (AMQChannel channel : _channelsForCurrentMessage) + for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage) { channel.receivedComplete(); } @@ -334,7 +325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ private void dataBlockReceived(AMQDataBlock message) throws Exception { - _lastReceived = message; if (message instanceof ProtocolInitiation) { protocolInitiationReceived((ProtocolInitiation) message); @@ -363,7 +353,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); - AMQChannel amqChannel = _channelMap.get(channelId); + AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId); if(amqChannel != null) { // The _receivedLock is already acquired in the caller @@ -558,14 +548,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt); @@ -611,11 +593,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (Exception e) { - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - _logger.error("Unexpected exception while processing frame. Closing connection.", e); closeProtocolSession(); @@ -625,7 +602,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); channel.publishContentHeader(body); @@ -633,7 +610,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); channel.publishContentBody(body); } @@ -681,17 +658,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _contextKey = contextKey; } - public List<AMQChannel> getChannels() + public List<AMQChannel<AMQProtocolEngine>> getChannels() { synchronized (_channelMap) { - return new ArrayList<AMQChannel>(_channelMap.values()); + return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values()); } } - public AMQChannel getAndAssertChannel(int channelId) throws AMQException + public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException { - AMQChannel channel = getChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); if (channel == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); @@ -700,9 +677,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return channel; } - public AMQChannel getChannel(int channelId) + public AMQChannel<AMQProtocolEngine> getChannel(int channelId) { - final AMQChannel channel = + final AMQChannel<AMQProtocolEngine> channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { @@ -719,7 +696,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } - public void addChannel(AMQChannel channel) throws AMQException + public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if (_closed) { @@ -770,7 +747,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = value; } - public void commitTransactions(AMQChannel channel) throws AMQException + public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -778,7 +755,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void rollbackTransactions(AMQChannel channel) throws AMQException + public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -802,7 +779,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeChannel(int channelId, AMQConstant cause, String message) { - final AMQChannel channel = getChannel(channelId); + final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); if (channel == null) { throw new IllegalArgumentException("Unknown channel id"); @@ -879,12 +856,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /** * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - * - * @throws AMQException if an error occurs while closing any channel */ private void closeAllChannels() { - for (AMQChannel channel : getChannels()) + for (AMQChannel<AMQProtocolEngine> channel : getChannels()) { channel.close(); } @@ -929,9 +904,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeAllChannels(); - for (Task task : _taskList) + for (Action<? super AMQProtocolEngine> task : _taskList) { - task.doTask(this); + task.performAction(this); } synchronized(this) @@ -961,7 +936,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (InterruptedException e) { - + // do nothing } finally { @@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); } - public String dump() - { - return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; - } - /** @return an object that can be used to identity */ public Object getKey() { @@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setClientProperties(FieldTable clientProperties) { - _clientProperties = clientProperties; - if (_clientProperties != null) + if (clientProperties != null) { - String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); + String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); if (closeWhenNoRoute != null) { _closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute); @@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8); - _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT); + _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8); + _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT); - String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); + String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); if (clientId != null) { setContextKey(new AMQShortString(clientId)); @@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _protocolVersion.getMinorVersion(); } - public boolean isProtocolVersion(byte major, byte minor) - { - return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); - } - public MethodRegistry getRegistry() { return getMethodRegistry(); @@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } - public void addSessionCloseTask(Task task) + public void addDeleteTask(Action<? super AMQProtocolEngine> task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeDeleteTask(Action<? super AMQProtocolEngine> task) { _taskList.remove(task); } @@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _clientProduct; } - public String getPrincipalAsString() - { - return getAuthId(); - } - public long getSessionCountLimit() { return getMaximumNumberOfChannels(); } - public Boolean isIncoming() - { - return true; - } - - public Boolean isSystemConnection() - { - return false; - } - - public Boolean isFederationLink() - { - return false; - } - - public String getAuthId() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - - public Integer getRemotePID() - { - return null; - } - - public String getRemoteProcessName() - { - return null; - } - - public Integer getRemoteParentPID() - { - return null; - } - public boolean isDurable() { return false; @@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getRemoteAddress()); } - public long getCreateTime() - { - return _createTime; - } - - public Boolean isShadow() - { - return false; - } - - public void mgmtClose() - { - MethodRegistry methodRegistry = getMethodRegistry(); - ConnectionCloseBody responseBody = - methodRegistry.createConnectionCloseBody( - AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("The connection was closed using the broker's management interface."), - 0,0); - - // This seems ugly but because we use closeConnection in both normal - // broker operation and as part of the management interface it cannot - // be avoided. The Current Actor will be null when this method is - // called via the QMF management interface. As such we need to set one. - boolean removeActor = false; - if (CurrentActor.get() == null) - { - removeActor = true; - CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); - } - - try - { - writeFrame(responseBody.generateFrame(0)); - - closeSession(); - - } - finally - { - if (removeActor) - { - CurrentActor.remove(); - } - } - } - public void mgmtCloseChannel(int channelId) { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public String getClientID() - { - return getContextKey().toString(); - } - - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) { - int channelId = ((AMQChannel)session).getChannelId(); + int channelId = session.getChannelId(); closeChannel(channelId, cause, message); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null)); + null)); } public void block() @@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(!_blocking) { _blocking = true; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) { channel.block(); } @@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(_blocking) { _blocking = false; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) { channel.unblock(); } @@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closed; } - public List<AMQSessionModel> getSessionModels() + public List<AMQChannel<AMQProtocolEngine>> getSessionModels() { - return new ArrayList<AMQSessionModel>(getChannels()); + return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels()); } public LogSubject getLogSubject() @@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } - public void setDeferFlush(boolean deferFlush) + @Override + public String getRemoteContainerName() { - _deferFlush = deferFlush; + return String.valueOf(getContextKey()); } - public String getUserName() + public void setDeferFlush(boolean deferFlush) { - return getAuthorizedPrincipal().getName(); + _deferFlush = deferFlush; } public final class WriteDeliverMethod diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 58a3b5df12..045367b667 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -37,12 +37,14 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.virtualhost.VirtualHost; -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel +public interface AMQProtocolSession<T extends AMQProtocolSession<T>> + extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel<T,AMQChannel<T>> { long getSessionID(); @@ -69,11 +71,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth */ SocketAddress getLocalAddress(); - public static interface Task - { - public void doTask(AMQProtocolSession session); - } - /** * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC * 6). @@ -98,7 +95,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth * * @return null if no channel exists, the channel otherwise */ - AMQChannel getChannel(int channelId); + AMQChannel<T> getChannel(int channelId); /** * Associate a channel with this session. @@ -106,7 +103,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth * @param channel the channel to associate with this session. It is an error to associate the same channel with more * than one session but this is not validated. */ - void addChannel(AMQChannel channel) throws AMQException; + void addChannel(AMQChannel<T> channel) throws AMQException; /** * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue @@ -185,10 +182,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setVirtualHost(VirtualHost virtualHost) throws AMQException; - void addSessionCloseTask(Task task); - - void removeSessionCloseTask(Task task); - public ProtocolOutputConverter getProtocolOutputConverter(); void setAuthorizedSubject(Subject authorizedSubject); @@ -209,11 +202,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setMaximumNumberOfChannels(Long value); - void commitTransactions(AMQChannel channel) throws AMQException; + void commitTransactions(AMQChannel<T> channel) throws AMQException; - void rollbackTransactions(AMQChannel channel) throws AMQException; + void rollbackTransactions(AMQChannel<T> channel) throws AMQException; - List<AMQChannel> getChannels(); + List<AMQChannel<T>> getChannels(); void mgmtCloseChannel(int channelId); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index ce90de7aac..c93c164978 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -99,16 +99,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { final AMQShortString consumerTagName; - if (queue.isExclusive() && !queue.isDurable()) - { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (session == null || session.getConnectionModel() != protocolConnection) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); - } - } - if (body.getConsumerTag() != null) { consumerTagName = body.getConsumerTag().intern(false); @@ -184,6 +174,13 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic + queue.getName() + " permission denied"); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an incompatible exclusivity policy"); + } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index c9a7cc69a1..43700049e1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -98,15 +98,6 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { - if (queue.isExclusive()) - { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (session == null || session.getConnectionModel() != protocolConnection) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue is exclusive, but not created on this Connection."); - } - } try { @@ -136,6 +127,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB "The GET request has been evaluated as an exclusive consumer, " + "this is likely due to a programming error in the Qpid broker"); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue has an incompatible exclusivit policy"); + } } } } @@ -145,7 +141,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final AMQChannel channel, final boolean acks) throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer + MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused { final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 401718db88..9b875ccf39 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -116,15 +116,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> try { - if (queue.isExclusive() && !queue.isDurable()) - { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (session == null || session.getConnectionModel() != protocolConnection) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); - } - } Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); String bindingKey = String.valueOf(routingKey); @@ -144,10 +135,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } } } - catch (AMQException e) - { - throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); - } catch (QpidSecurityException e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 5b5525643c..215e3f2f23 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -96,9 +98,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else { - AMQSessionModel owningSession = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() - && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); @@ -114,42 +114,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar try { - queue = createQueue(queueName, body, virtualHost, protocolConnection); - queue.setAuthorizationHolder(protocolConnection); - - if (body.getExclusive()) - { - queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); - queue.setAuthorizationHolder(protocolConnection); - - if(!body.getDurable()) - { - final AMQQueue q = queue; - final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) - { - q.setExclusiveOwningSession(null); - } - }; - protocolConnection.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void performAction(AMQQueue queue) - { - protocolConnection.removeSessionCloseTask(sessionCloseTask); - } - }); - } - } + queue = createQueue(channel, queueName, body, virtualHost, protocolConnection); } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - AMQSessionModel owningSession = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); @@ -161,19 +134,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " + queue.isExclusive() + " requested " + body.getExclusive() + ")"); } - else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection))) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " - + "as exclusive queue with same name " - + "declared on another client ID('" - + queue.getOwner() + "') your clientID('" + session.getClientID() + "')"); - - } - else if(queue.isAutoDelete() != body.getAutoDelete()) + else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) + || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: " - + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); + "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " + + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")"); } else if(queue.isDurable() != body.getDurable()) { @@ -211,7 +177,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return new AMQShortString("tmp_" + UUID.randomUUID()); } - protected AMQQueue createQueue(final AMQShortString queueName, + protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName, QueueDeclareBody body, final VirtualHost virtualHost, final AMQProtocolSession session) @@ -222,48 +188,36 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final boolean autoDelete = body.getAutoDelete(); final boolean exclusive = body.getExclusive(); - String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null; - Map<String, Object> arguments = + Map<String, Object> attributes = QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); - String queueNameString = AMQShortString.toString(queueName); - final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()); + final String queueNameString = AMQShortString.toString(queueName); + attributes.put(Queue.NAME, queueNameString); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName())); + attributes.put(Queue.DURABLE, durable); - final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete, - exclusive, autoDelete, arguments); + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; - if (exclusive && !durable) + if(exclusive) { - final AMQProtocolSession.Task deleteQueueTask = - new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) - { - if (virtualHost.getQueue(queueName.toString()) == queue) - { - try - { - virtualHost.removeQueue(queue); - } - catch (QpidSecurityException e) - { - throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e); - } - } - } - }; - - session.addSessionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - session.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetimePolicy = autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } + else + { + lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; } + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + + + final AMQQueue queue = virtualHost.createQueue(channel, attributes); + return queue; } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index 3a9a6dc44e..c939e49aab 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -105,8 +105,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB } else { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java index 6d8f8e64fc..9d035a3f57 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java @@ -96,9 +96,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod } else { - AMQSessionModel session = queue.getExclusiveOwningSession(); - - if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue is exclusive, but not created on this Connection."); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 7661d98cb4..f8888f985e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -223,7 +223,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr // Then the AMQMinaProtocolSession can join on the returning future without a NPE. } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(AMQChannel session, AMQConstant cause, String message) { super.closeSession(session, cause, message); diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index cae61f9d80..7f8237cc85 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.security.Principal; import java.text.MessageFormat; import java.util.Collection; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; @@ -42,7 +43,7 @@ import java.util.List; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; -public class Connection_1_0 implements ConnectionEventListener +public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { private final Port _port; @@ -53,8 +54,33 @@ public class Connection_1_0 implements ConnectionEventListener private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); private final Object _reference = new Object(); - private List<Action<Connection_1_0>> _closeTasks = - Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>()); + + private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); + private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); + private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); + private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); + + private final LogSubject _logSubject = new LogSubject() + { + @Override + public String toLogString() + { + return "[" + + MessageFormat.format(CONNECTION_FORMAT, + getConnectionId(), + getClientId(), + getRemoteAddressString(), + _vhost.getName()) + + "] "; + + } + }; + + private volatile boolean _stopped; + + + private List<Action<? super Connection_1_0>> _closeTasks = + Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>()); @@ -69,7 +95,7 @@ public class Connection_1_0 implements ConnectionEventListener _transport = transport; _conn = conn; _connectionId = connectionId; - _vhost.getConnectionRegistry().registerConnection(_model); + _vhost.getConnectionRegistry().registerConnection(this); } @@ -80,7 +106,7 @@ public class Connection_1_0 implements ConnectionEventListener public void remoteSessionCreation(SessionEndpoint endpoint) { - Session_1_0 session = new Session_1_0(_vhost, this); + Session_1_0 session = new Session_1_0(_vhost, this, endpoint); _sessions.add(session); endpoint.setSessionEventListener(session); } @@ -90,24 +116,24 @@ public class Connection_1_0 implements ConnectionEventListener _sessions.remove(session); } - void removeConnectionCloseTask(final Action<Connection_1_0> task) + public void removeDeleteTask(final Action<? super Connection_1_0> task) { _closeTasks.remove( task ); } - void addConnectionCloseTask(final Action<Connection_1_0> task) + public void addDeleteTask(final Action<? super Connection_1_0> task) { _closeTasks.add( task ); } public void closeReceived() { - List<Action<Connection_1_0>> taskCopy; + List<Action<? super Connection_1_0>> taskCopy; synchronized (_closeTasks) { - taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks); + taskCopy = new ArrayList<Action<? super Connection_1_0>>(_closeTasks); } - for(Action<Connection_1_0> task : taskCopy) + for(Action<? super Connection_1_0> task : taskCopy) { task.performAction(this); } @@ -115,7 +141,7 @@ public class Connection_1_0 implements ConnectionEventListener { _closeTasks.clear(); } - _vhost.getConnectionRegistry().deregisterConnection(_model); + _vhost.getConnectionRegistry().deregisterConnection(this); } @@ -125,30 +151,6 @@ public class Connection_1_0 implements ConnectionEventListener closeReceived(); } - private final AMQConnectionModel _model = new AMQConnectionModel() - { - private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); - private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); - - private final LogSubject _logSubject = new LogSubject() - { - @Override - public String toLogString() - { - return "[" + - MessageFormat.format(CONNECTION_FORMAT, - getConnectionId(), - getClientId(), - getRemoteAddressString(), - _vhost.getName()) - + "] "; - - } - }; - - private volatile boolean _stopped; @Override public void close(AMQConstant cause, String message) @@ -169,9 +171,9 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(Session_1_0 session, AMQConstant cause, String message) { - // TODO + session.close(cause, message); } @Override @@ -181,9 +183,9 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public List<AMQSessionModel> getSessionModels() + public List<Session_1_0> getSessionModels() { - return new ArrayList<AMQSessionModel>(_sessions); + return new ArrayList<Session_1_0>(_sessions); } @Override @@ -193,12 +195,6 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public String getUserName() - { - return getPrincipalAsString(); - } - - @Override public boolean isSessionNameUnique(byte[] name) { return true; // TODO @@ -216,7 +212,13 @@ public class Connection_1_0 implements ConnectionEventListener return _conn.getRemoteContainerId(); } - @Override + @Override + public String getRemoteContainerName() + { + return _conn.getRemoteContainerId(); + } + + @Override public String getClientVersion() { return ""; //TODO @@ -228,10 +230,9 @@ public class Connection_1_0 implements ConnectionEventListener return ""; //TODO } - @Override - public String getPrincipalAsString() + public Principal getAuthorizedPrincipal() { - return String.valueOf(_conn.getUser()); + return _conn.getUser(); } @Override @@ -337,11 +338,10 @@ public class Connection_1_0 implements ConnectionEventListener } - }; AMQConnectionModel getModel() { - return _model; + return this; } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 546cc79f9e..f7e2d2df50 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -30,6 +30,9 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -207,15 +210,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - queue = _vhost.createQueue( - UUIDGenerator.generateQueueUUID(name, _vhost.getName()), - name, - isDurable, - null, - true, - true, - true, - Collections.EMPTY_MAP); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName())); + attributes.put(Queue.NAME, name); + attributes.put(Queue.DURABLE, isDurable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK); + + queue = _vhost.createQueue(getSession(), attributes); } else { @@ -308,44 +310,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); - if(!isDurable) - { - final String queueName = name; - final AMQQueue tempQueue = queue; - - final Action<Connection_1_0> deleteQueueTask = - new Action<Connection_1_0>() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue", e); - } - } - } - }; - - getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); - } - - - }); - } - qd = new QueueDestination(queue); } catch (QpidSecurityException e) @@ -409,6 +373,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer"); throw new ConnectionScopedRuntimeException(e); } + catch (MessageSource.ConsumerAccessRefused e) + { + _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy"); + throw new ConnectionScopedRuntimeException(e); + } } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index c055d1e840..6840c7344a 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -26,8 +26,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.LifetimePolicy; import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; @@ -36,13 +38,13 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; @@ -55,25 +57,34 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject +public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject { private static final Logger _logger = Logger.getLogger(Session_1_0.class); private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + private final SessionEndpoint _endpoint; private VirtualHost _vhost; private AutoCommitTransaction _transaction; private final LinkedHashMap<Integer, ServerTransaction> _openTransactions = new LinkedHashMap<Integer, ServerTransaction>(); + + private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList = + new CopyOnWriteArrayList<Action<? super Session_1_0>>(); + private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); + private AtomicBoolean _closed = new AtomicBoolean(); - public Session_1_0(VirtualHost vhost, final Connection_1_0 connection) + public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) { _vhost = vhost; + _endpoint = endpoint; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; @@ -333,64 +344,41 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu LifetimePolicy lifetimePolicy = properties == null ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName())); + attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); + attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false); - final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - false, - properties); - - - - if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) + if(lifetimePolicy instanceof DeleteOnNoLinks) { - final Action<Connection_1_0> deleteQueueTask = - new Action<Connection_1_0>() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue from vhost", e); - } - } - } - }; - - _connection.addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - _connection.removeConnectionCloseTask(deleteQueueTask); - } - - - }); + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS); + } + else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + { + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinks) + else if(lifetimePolicy instanceof DeleteOnClose) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } else if(lifetimePolicy instanceof DeleteOnNoMessages) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + else { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } + + + // TODO convert AMQP 1-0 node properties to queue attributes + + final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); } catch (QpidSecurityException e) { @@ -462,11 +450,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } - public void forceEnd() - { - } - - @Override public UUID getId() { @@ -474,9 +457,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public AMQConnectionModel getConnectionModel() + public Connection_1_0 getConnectionModel() { - return _connection.getModel(); + return _connection; } @Override @@ -489,14 +472,35 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public void close() { - // TODO - required for AMQSessionModel / management initiated closing + performCloseTasks(); + _endpoint.end(); + } + + protected void performCloseTasks() + { + + if(_closed.compareAndSet(false, true)) + { + List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList); + _taskList.clear(); + for(Action<? super Session_1_0> task : taskList) + { + task.performAction(this); + } + } } @Override public void close(AMQConstant cause, String message) { - // TODO - required for AMQSessionModel + performCloseTasks(); + final End end = new End(); + final Error theError = new Error(); + theError.setDescription(message); + theError.setCondition(ConnectionError.CONNECTION_FORCED); + end.setError(theError); + _endpoint.end(end); } @Override @@ -586,8 +590,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public int getChannelId() { - // TODO - return 0; + return _endpoint.getSendingChannel(); } @Override @@ -609,13 +612,12 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu connectionId, getClientID(), remoteAddress, - _vhost.getName(), // TODO - virtual host - 0) // TODO - channel) - + "] "; + _vhost.getName(), + _endpoint.getSendingChannel()) + "] "; } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(Session_1_0 o) { return getId().compareTo(o.getId()); } @@ -625,4 +627,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu return _connection; } + @Override + public void addDeleteTask(final Action<? super Session_1_0> task) + { + if(!_closed.get()) + { + _taskList.add(task); + } + } + + @Override + public void removeDeleteTask(final Action<? super Session_1_0> task) + { + _taskList.remove(task); + } } diff --git a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 7d76a0ee8e..3ded20ae6a 100644 --- a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -971,33 +971,15 @@ class ManagementNode implements MessageSource<ManagementNodeConsumer,ManagementN } @Override - public AuthorizationHolder getAuthorizationHolder() - { - return null; - } - - @Override - public void setAuthorizationHolder(final AuthorizationHolder principalHolder) - { - - } - - @Override - public void setExclusiveOwningSession(final AMQSessionModel owner) - { - - } - - @Override - public AMQSessionModel getExclusiveOwningSession() + public boolean isExclusive() { - return null; + return false; } @Override - public boolean isExclusive() + public boolean verifySessionAccess(final AMQSessionModel<?, ?> session) { - return false; + return true; } @Override diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java index 407da0fd3f..ed5e195043 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java @@ -166,7 +166,7 @@ public class ExchangeMBean extends AMQManagedObject implements ManagedExchange public boolean isAutoDelete() { - return _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE; + return _exchange.getLifetimePolicy() != LifetimePolicy.PERMANENT; } public TabularData bindings() throws IOException, JMException diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 1365ceb06a..b44a752312 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; @@ -194,7 +195,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public boolean isAutoDelete() { - return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE; + return _queue.getLifetimePolicy() != LifetimePolicy.PERMANENT; } public Long getMaximumMessageAge() @@ -264,12 +265,29 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public boolean isExclusive() { - return (Boolean) _queue.getAttribute(Queue.EXCLUSIVE); + final Object attribute = _queue.getAttribute(Queue.EXCLUSIVE); + return attribute != null && attribute != ExclusivityPolicy.NONE; } public void setExclusive(boolean exclusive) { - _queue.setAttribute(Queue.EXCLUSIVE, isExclusive(), exclusive); + if(exclusive) + { + Object currentValue = _queue.getAttribute(Queue.EXCLUSIVE); + if(currentValue == null || currentValue == ExclusivityPolicy.NONE) + { + _queue.setAttribute(Queue.EXCLUSIVE, currentValue, ExclusivityPolicy.CONTAINER); + } + } + else + { + Object currentValue = _queue.getAttribute(Queue.EXCLUSIVE); + if(currentValue != null && currentValue != ExclusivityPolicy.NONE) + { + _queue.setAttribute(Queue.EXCLUSIVE, currentValue, ExclusivityPolicy.NONE); + } + } + } public void setAlternateExchange(String exchangeName) throws OperationsException diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java index 2c88f83405..c39c3f74e9 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -181,8 +181,13 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi throws IOException, JMException { final Map<String, Object> createArgs = processNewQueueArguments(queueName, owner, originalArguments); - getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, - QueueArgumentsConverter.convertWireArgsToModel(createArgs)); + + final Map<String, Object> attributes = QueueArgumentsConverter.convertWireArgsToModel(createArgs); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.DURABLE, durable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + + getConfiguredObject().createQueue(attributes); } diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index f94c206512..2874168ddf 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Statistics; @@ -154,7 +155,7 @@ public class QueueMBeanTest extends QpidTestCase public void testIsAutoDelete() throws Exception { - when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.AUTO_DELETE); + when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); assertTrue(_queueMBean.isAutoDelete()); } @@ -224,22 +225,31 @@ public class QueueMBeanTest extends QpidTestCase testSetAttribute("flowResumeCapacity", Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,1048576l , 2097152l); } + + /********** Other attributes **********/ + + public void testIsExclusive() throws Exception { - assertAttribute("exclusive", Boolean.TRUE, Queue.EXCLUSIVE); + when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.CONTAINER); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "exclusive", true); } public void testIsNotExclusive() throws Exception { - assertAttribute("exclusive", Boolean.FALSE, Queue.EXCLUSIVE); + when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "exclusive", false); } public void testSetExclusive() throws Exception { - testSetAttribute("exclusive", Queue.EXCLUSIVE, Boolean.FALSE , Boolean.TRUE); - } + when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE); - /********** Other attributes **********/ + MBeanTestUtils.setMBeanAttribute(_queueMBean, "exclusive", Boolean.TRUE); + + verify(_mockQueue).setAttribute(Queue.EXCLUSIVE, ExclusivityPolicy.NONE, ExclusivityPolicy.CONTAINER); + + } public void testGetAlternateExchange() { diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java index 4240dd5280..2dc2cb8d3b 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.mockito.ArgumentCaptor; public class VirtualHostManagerMBeanTest extends TestCase { @@ -68,8 +69,15 @@ public class VirtualHostManagerMBeanTest extends TestCase public void testCreateQueueWithNoOwner() throws Exception { _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, null, true); + ArgumentCaptor<Map> argsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(_mockVirtualHost).createQueue(argsCaptor.capture()); + + Map actualAttributes = argsCaptor.getValue(); + assertEquals(TEST_QUEUE_NAME, actualAttributes.get(Queue.NAME)); + assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE)); + assertEquals(null,actualAttributes.get(Queue.OWNER)); - verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, EMPTY_ARGUMENT_MAP); } /** @@ -79,9 +87,15 @@ public class VirtualHostManagerMBeanTest extends TestCase public void testCreateQueueWithOwnerMappedThroughToDescription() throws Exception { _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true); + ArgumentCaptor<Map> argsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(_mockVirtualHost).createQueue(argsCaptor.capture()); - Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER); - verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); + Map actualAttributes = argsCaptor.getValue(); + assertEquals(TEST_QUEUE_NAME,actualAttributes.get(Queue.NAME)); + assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE)); + assertEquals(null,actualAttributes.get(Queue.OWNER)); + assertEquals(TEST_OWNER, actualAttributes.get(Queue.DESCRIPTION)); } public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception @@ -89,8 +103,15 @@ public class VirtualHostManagerMBeanTest extends TestCase Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments); - Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION); - verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); + ArgumentCaptor<Map> argsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(_mockVirtualHost).createQueue(argsCaptor.capture()); + + Map actualAttributes = argsCaptor.getValue(); + assertEquals(TEST_QUEUE_NAME,actualAttributes.get(Queue.NAME)); + assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE)); + assertEquals(null,actualAttributes.get(Queue.OWNER)); + assertEquals(TEST_DESCRIPTION, actualAttributes.get(Queue.DESCRIPTION)); } public void testDeleteQueue() throws Exception |