diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 698 |
1 files changed, 588 insertions, 110 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b2408f6dfa..aa7025e068 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.server.queue; +import java.security.Principal; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; @@ -28,11 +29,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; @@ -50,12 +54,16 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.Deletable; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -78,19 +86,10 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private final String _name; /** null means shared */ - private final String _owner; - - private AuthorizationHolder _authorizationHolder; - - private boolean _exclusive = false; - private AMQSessionModel _exclusiveOwner; - + private String _description; private final boolean _durable; - /** If true, this queue is deleted when the last subscriber is removed */ - private final boolean _autoDelete; - private Exchange _alternateExchange; @@ -142,6 +141,10 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private long _flowResumeCapacity; + private ExclusivityPolicy _exclusivityPolicy; + private LifetimePolicy _lifetimePolicy; + private Object _exclusiveOwner; // could be connection, session or Principal + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); @@ -157,7 +160,8 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>(); + private final List<Action<? super Q>> _deleteTaskList = + new CopyOnWriteArrayList<Action<? super Q>>(); private LogSubject _logSubject; @@ -184,16 +188,98 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - - protected SimpleAMQQueue(UUID id, - String name, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments) + protected SimpleAMQQueue(VirtualHost virtualHost, + final AMQSessionModel<?,?> creatingSession, + Map<String, Object> attributes, + QueueEntryListFactory<E, Q, L> entryListFactory) { + UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); + String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false); + + + _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class, + Queue.EXCLUSIVE, + attributes, + ExclusivityPolicy.NONE); + _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT); + if(creatingSession != null) + { + + switch(_exclusivityPolicy) + { + + case PRINCIPAL: + _exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal(); + break; + case CONTAINER: + _exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName(); + break; + case CONNECTION: + _exclusiveOwner = creatingSession.getConnectionModel(); + addExclusivityConstraint(creatingSession.getConnectionModel()); + break; + case SESSION: + _exclusiveOwner = creatingSession; + addExclusivityConstraint(creatingSession); + break; + case NONE: + case LINK: + // nothing to do as if link no link associated until there is a consumer associated + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy: " + + _exclusivityPolicy + + " this is a coding error inside Qpid"); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = new AuthenticatedPrincipal(owner); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = owner; + } + } + + + if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE) + { + if(creatingSession != null) + { + addLifetimeConstraint(creatingSession.getConnectionModel()); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } + else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END) + { + if(creatingSession != null) + { + addLifetimeConstraint(creatingSession); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } if (name == null) { @@ -207,12 +293,18 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA _name = name; _durable = durable; - _owner = owner; - _autoDelete = autoDelete; - _exclusive = exclusive; _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList((Q)this); - _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments)); + _entries = entryListFactory.createQueueEntryList((Q) this); + final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes); + + arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); + arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); + + _arguments = Collections.synchronizedMap(arguments); + _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null); + + _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); + _id = id; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); @@ -220,30 +312,113 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA _logSubject = new QueueLogSubject(this); _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); + + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE)) + { + setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes)); + } + else + { + setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)) + { + setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes)); + } + else + { + setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)) + { + setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + attributes)); + } + else + { + setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)) + { + setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + attributes)); + } + else + { + setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes()); + } + if (attributes.containsKey(Queue.ALERT_REPEAT_GAP)) + { + setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes)); + } + else + { + setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)) + { + setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes)); + } + else + { + setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)) + { + setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes)); + } + else + { + setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes()); + } + if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) + { + setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes)); + } + else + { + setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts()); + } + + final String ownerString; + switch(_exclusivityPolicy) + { + case PRINCIPAL: + ownerString = ((Principal) _exclusiveOwner).getName(); + break; + case CONTAINER: + ownerString = (String) _exclusiveOwner; + break; + default: + ownerString = null; + + } + // Log the creation of this Queue. // The priorities display is toggled on if we set priorities > 0 CurrentActor.get().message(_logSubject, - QueueMessages.CREATED(String.valueOf(_owner), + QueueMessages.CREATED(ownerString, _entries.getPriorities(), - _owner != null, - autoDelete, - durable, !durable, + ownerString != null , + _lifetimePolicy != LifetimePolicy.PERMANENT, + durable, + !durable, _entries.getPriorities() > 0)); - if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY)) + if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) { - if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null - && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null + && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) { - Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); + Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), + new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)), defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else { - _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -256,6 +431,38 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } + private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject) + { + final Action<Deletable> deleteQueueTask = new Action<Deletable>() + { + @Override + public void performAction(final Deletable object) + { + try + { + getVirtualHost().removeQueue(SimpleAMQQueue.this); + } + catch (QpidSecurityException e) + { + throw new ConnectionScopedRuntimeException("Unable to delete a queue even though the queue's " + + "lifetime was tied to an object being deleted"); + } + } + }; + + lifetimeObject.addDeleteTask(deleteQueueTask); + addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask)); + } + + private void addExclusivityConstraint(final Deletable<? extends Deletable> lifetimeObject) + { + final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject); + final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction); + clearOwnerAction.setDeleteTask(deleteDeleteTask); + lifetimeObject.addDeleteTask(clearOwnerAction); + addDeleteTask(deleteDeleteTask); + } + public void resetNotifications() { // This ensure that the notification checks for the configured alerts are created. @@ -303,12 +510,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA public boolean isExclusive() { - return _exclusive; - } - - public void setExclusive(boolean exclusive) - { - _exclusive = exclusive; + return _exclusivityPolicy != ExclusivityPolicy.NONE; } public Exchange getAlternateExchange() @@ -342,27 +544,27 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA return _arguments.get(attrName); } - public boolean isAutoDelete() + @Override + public LifetimePolicy getLifetimePolicy() { - return _autoDelete; + return _lifetimePolicy; } public String getOwner() { - return _owner; - } - - public AuthorizationHolder getAuthorizationHolder() - { - return _authorizationHolder; - } - - public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) - { - _authorizationHolder = authorizationHolder; + if(_exclusiveOwner != null) + { + switch(_exclusivityPolicy) + { + case CONTAINER: + return (String) _exclusiveOwner; + case PRINCIPAL: + return ((Principal)_exclusiveOwner).getName(); + } + } + return null; } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -381,7 +583,9 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<Consumer.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException + EnumSet<Consumer.Option> optionSet) + throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException, + ConsumerAccessRefused { // Access control @@ -396,15 +600,77 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA throw new ExistingExclusiveConsumer(); } + switch(_exclusivityPolicy) + { + case CONNECTION: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel(); + addExclusivityConstraint(target.getSessionModel().getConnectionModel()); + } + else + { + if(_exclusiveOwner != target.getSessionModel().getConnectionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case SESSION: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel(); + addExclusivityConstraint(target.getSessionModel()); + } + else + { + if(_exclusiveOwner != target.getSessionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case LINK: + if(getConsumerCount() != 0) + { + throw new ConsumerAccessRefused(); + } + break; + case PRINCIPAL: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else + { + if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case CONTAINER: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else + { + if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case NONE: + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); - if (exclusive && !isTransient && getConsumerCount() != 0) - { - throw new ExistingConsumerPreventsExclusive(); - } - QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass, optionSet.contains(Consumer.Option.ACQUIRES), optionSet.contains(Consumer.Option.SEES_REQUEUES), @@ -473,11 +739,12 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA consumer.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); + consumer.setQueueContext(null); - if(!isDeleted() && isExclusive() && getConsumerCount() == 0) + if(_exclusivityPolicy == ExclusivityPolicy.LINK) { - setAuthorizationHolder(null); + _exclusiveOwner = null; } if(_messageGroupManager != null) @@ -495,8 +762,12 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 ) + if(!consumer.isTransient() + && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS ) + && getConsumerCount() == 0) { + if (_logger.isInfoEnabled()) { _logger.info("Auto-deleting queue:" + this); @@ -1266,12 +1537,14 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA }); } - public void addQueueDeleteTask(final Action<AMQQueue> task) + @Override + public void addDeleteTask(final Action<? super Q> task) { _deleteTaskList.add(task); } - public void removeQueueDeleteTask(final Action<AMQQueue> task) + @Override + public void removeDeleteTask(final Action<? super Q> task) { _deleteTaskList.remove(task); } @@ -1343,9 +1616,9 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } - for (Action<AMQQueue> task : _deleteTaskList) + for (Action<? super Q> task : _deleteTaskList) { - task.performAction(this); + task.performAction((Q)this); } _deleteTaskList.clear(); @@ -1940,6 +2213,26 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA return _notificationChecks; } + private static class DeleteDeleteTask implements Action<Deletable> + { + + private final Deletable<? extends Deletable> _lifetimeObject; + private final Action<? super Deletable> _deleteQueueOwnerTask; + + public DeleteDeleteTask(final Deletable<? extends Deletable> lifetimeObject, + final Action<? super Deletable> deleteQueueOwnerTask) + { + _lifetimeObject = lifetimeObject; + _deleteQueueOwnerTask = deleteQueueOwnerTask; + } + + @Override + public void performAction(final Deletable object) + { + _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask); + } + } + private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State> { @@ -1990,38 +2283,6 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA return ids; } - public AMQSessionModel getExclusiveOwningSession() - { - return _exclusiveOwner; - } - - public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) - { - _exclusive = true; - _exclusiveOwner = exclusiveOwner; - } - - - public void configure(QueueConfiguration config) - { - if (config != null) - { - setMaximumMessageAge(config.getMaximumMessageAge()); - setMaximumQueueDepth(config.getMaximumQueueDepth()); - setMaximumMessageSize(config.getMaximumMessageSize()); - setMaximumMessageCount(config.getMaximumMessageCount()); - setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); - setMaximumDeliveryCount(config.getMaxDeliveryCount()); - _capacity = config.getCapacity(); - _flowResumeCapacity = config.getFlowResumeCapacity(); - } - } - - public long getMessageDequeueCount() - { - return _dequeueCount.get(); - } - public long getTotalEnqueueSize() { return _enqueueSize.get(); @@ -2130,20 +2391,13 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA @Override public void setDescription(String description) { - if (description == null) - { - _arguments.remove(Queue.DESCRIPTION); - } - else - { - _arguments.put(Queue.DESCRIPTION, description); - } + _description = description; } @Override public String getDescription() { - return (String) _arguments.get(Queue.DESCRIPTION); + return _description; } public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, @@ -2176,4 +2430,228 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } + @Override + public boolean verifySessionAccess(final AMQSessionModel<?, ?> session) + { + boolean allowed; + switch(_exclusivityPolicy) + { + case NONE: + allowed = true; + break; + case SESSION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session; + break; + case CONNECTION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel(); + break; + case PRINCIPAL: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal()); + break; + case CONTAINER: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName()); + break; + case LINK: + allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session; + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } + return allowed; + } + + @Override + public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy) + throws ExistingConsumerPreventsExclusive + { + if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE)) + { + switch(desiredPolicy) + { + case NONE: + _exclusiveOwner = null; + break; + case PRINCIPAL: + switchToPrincipalExclusivity(); + break; + case CONTAINER: + switchToContainerExclusivity(); + break; + case CONNECTION: + switchToConnectionExclusivity(); + break; + case SESSION: + switchToSessionExclusivity(); + break; + case LINK: + switchToLinkExclusivity(); + break; + } + _exclusivityPolicy = desiredPolicy; + } + } + + private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive + { + switch (getConsumerCount()) + { + case 1: + _exclusiveSubscriber = getConsumerList().getHead().getConsumer(); + // deliberate fall through + case 0: + _exclusiveOwner = null; + break; + default: + throw new ExistingConsumerPreventsExclusive(); + } + + } + + private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive + { + + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + case CONTAINER: + case CONNECTION: + AMQSessionModel session = null; + for(Consumer c : getConsumers()) + { + if(session == null) + { + session = c.getSessionModel(); + } + else if(!session.equals(c.getSessionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = session; + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + case PRINCIPAL: + AMQConnectionModel con = null; + for(Consumer c : getConsumers()) + { + if(con == null) + { + con = c.getSessionModel().getConnectionModel(); + } + else if(!con.equals(c.getSessionModel().getConnectionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = con; + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + String containerID = null; + for(Consumer c : getConsumers()) + { + if(containerID == null) + { + containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = containerID; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + } + + private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + Principal principal = null; + for(Consumer c : getConsumers()) + { + if(principal == null) + { + principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = principal; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + } + + private class ClearOwnerAction implements Action<Deletable> + { + private final Deletable<? extends Deletable> _lifetimeObject; + private DeleteDeleteTask _deleteTask; + + public ClearOwnerAction(final Deletable<? extends Deletable> lifetimeObject) + { + _lifetimeObject = lifetimeObject; + } + + @Override + public void performAction(final Deletable object) + { + if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject) + { + SimpleAMQQueue.this._exclusiveOwner = null; + } + if(_deleteTask != null) + { + removeDeleteTask(_deleteTask); + } + } + + public void setDeleteTask(final DeleteDeleteTask deleteTask) + { + _deleteTask = deleteTask; + } + } } |