diff options
Diffstat (limited to 'java/broker-core/src')
55 files changed, 1633 insertions, 955 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 25466d9c55..4e9d94c030 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.configuration; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.Map; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; @@ -43,7 +45,6 @@ public class QueueConfiguration extends AbstractConfiguration CompositeConfiguration mungedConf = new CompositeConfiguration(); mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + escapeTagName(name))); - mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues")); setConfiguration("virtualhosts.virtualhost.queues.queue", mungedConf); } @@ -85,19 +86,33 @@ public class QueueConfiguration extends AbstractConfiguration return _vHostConfig; } + private boolean getDefaultedBoolean(String attribute) + { + final Configuration config = _vHostConfig.getConfig(); + if(config.containsKey("queues."+attribute)) + { + final boolean defaultValue = config.getBoolean("queues." + attribute); + return getBooleanValue(attribute, defaultValue); + } + else + { + return getBooleanValue(attribute); + } + } + public boolean getDurable() { - return getBooleanValue("durable"); + return getDefaultedBoolean("boolean"); } public boolean getExclusive() { - return getBooleanValue("exclusive"); + return getDefaultedBoolean("exclusive"); } public boolean getAutoDelete() { - return getBooleanValue("autodelete"); + return getDefaultedBoolean("autodelete"); } public String getOwner() @@ -107,17 +122,41 @@ public class QueueConfiguration extends AbstractConfiguration public boolean getPriority() { - return getBooleanValue("priority"); + return getDefaultedBoolean("priority"); } public int getPriorities() { - return getIntValue("priorities", -1); + final Configuration config = _vHostConfig.getConfig(); + + int defaultValue; + if(config.containsKey("queues.priorities")) + { + defaultValue = config.getInt("queues.priorities"); + } + else + { + defaultValue = -1; + } + return getIntValue("priorities", defaultValue); } public String getExchange() { - return getStringValue("exchange", ExchangeDefaults.DEFAULT_EXCHANGE_NAME); + final Configuration config = _vHostConfig.getConfig(); + + String defaultValue; + + if(config.containsKey("queues.exchange")) + { + defaultValue = config.getString("queues.exchange"); + } + else + { + defaultValue = ""; + } + + return getStringValue("exchange", defaultValue); } public List getRoutingKeys() @@ -137,37 +176,37 @@ public class QueueConfiguration extends AbstractConfiguration public int getMaximumMessageAge() { - return getIntValue("maximumMessageAge", _vHostConfig.getMaximumMessageAge()); + return getIntValue("maximumMessageAge"); } public long getMaximumQueueDepth() { - return getLongValue("maximumQueueDepth", _vHostConfig.getMaximumQueueDepth()); + return getLongValue("maximumQueueDepth"); } public long getMaximumMessageSize() { - return getLongValue("maximumMessageSize", _vHostConfig.getMaximumMessageSize()); + return getLongValue("maximumMessageSize"); } public long getMaximumMessageCount() { - return getLongValue("maximumMessageCount", _vHostConfig.getMaximumMessageCount()); + return getLongValue("maximumMessageCount"); } public long getMinimumAlertRepeatGap() { - return getLongValue("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap()); + return getLongValue("minimumAlertRepeatGap"); } public long getCapacity() { - return getLongValue("capacity", _vHostConfig.getCapacity()); + return getLongValue("capacity"); } public long getFlowResumeCapacity() { - return getLongValue("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity()); + return getLongValue("flowResumeCapacity"); } public boolean isLVQ() diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 9870551313..ea0faf132e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import java.lang.ref.WeakReference; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; @@ -29,6 +30,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -120,19 +122,25 @@ public class FilterSupport public static final class NoLocalFilter implements MessageFilter { - private final MessageSource _queue; + private final MessageSource<?,?> _queue; - public NoLocalFilter(MessageSource queue) + private NoLocalFilter(MessageSource queue) { _queue = queue; } public boolean matches(Filterable message) { - final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || - exclusiveOwningSession.getConnectionReference() != message.getConnectionReference(); + final Collection<? extends Consumer> consumers = _queue.getConsumers(); + for(Consumer c : consumers) + { + if(c.getSessionModel().getConnectionReference() == message.getConnectionReference()) + { + return false; + } + } + return !consumers.isEmpty(); } @Override diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index 5b0e34b73e..dcbb693cf1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -46,7 +46,7 @@ public class ChannelLogSubject extends AbstractLogSubject AMQConnectionModel connection = session.getConnectionModel(); setLogStringWithFormat(CHANNEL_FORMAT, connection == null ? -1L : connection.getConnectionId(), - (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(), + (connection == null || connection.getAuthorizedPrincipal() == null) ? "?" : connection.getAuthorizedPrincipal().getName(), (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(), (connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(), session.getChannelId()); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java index 87c2377e0f..b1b8bc50ed 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java @@ -55,7 +55,7 @@ public class ConnectionLogSubject extends AbstractLogSubject { if (!_upToDate) { - if (_session.getPrincipalAsString() != null) + if (_session.getAuthorizedPrincipal() != null) { if (_session.getVirtualHostName() != null) { @@ -71,7 +71,7 @@ public class ConnectionLogSubject extends AbstractLogSubject */ setLogString("[" + MessageFormat.format(CONNECTION_FORMAT, _session.getConnectionId(), - _session.getPrincipalAsString(), + _session.getAuthorizedPrincipal().getName(), _session.getRemoteAddressString(), _session.getVirtualHostName()) + "] "); @@ -82,7 +82,7 @@ public class ConnectionLogSubject extends AbstractLogSubject { setLogString("[" + MessageFormat.format(USER_FORMAT, _session.getConnectionId(), - _session.getPrincipalAsString(), + _session.getAuthorizedPrincipal().getName(), _session.getRemoteAddressString()) + "] "); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index b947bc0ef6..33c8e26790 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -36,7 +36,8 @@ public interface MessageSource<C extends Consumer, S extends MessageSource<C,S>> <T extends ConsumerTarget> C addConsumer(T target, FilterManager filters, Class<? extends ServerMessage> messageClass, String consumerName, EnumSet<Consumer.Option> options) - throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException; + throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException, + ConsumerAccessRefused; Collection<C> getConsumers(); @@ -44,16 +45,10 @@ public interface MessageSource<C extends Consumer, S extends MessageSource<C,S>> void removeConsumerRegistrationListener(ConsumerRegistrationListener<S> listener); - AuthorizationHolder getAuthorizationHolder(); - - void setAuthorizationHolder(AuthorizationHolder principalHolder); - - void setExclusiveOwningSession(AMQSessionModel owner); - - AMQSessionModel getExclusiveOwningSession(); - boolean isExclusive(); + boolean verifySessionAccess(AMQSessionModel<?,?> session); + interface ConsumerRegistrationListener<Q extends MessageSource<? extends Consumer,?>> { void consumerAdded(Q source, Consumer consumer); @@ -76,7 +71,6 @@ public interface MessageSource<C extends Consumer, S extends MessageSource<C,S>> public ExistingExclusiveConsumer() { - super(""); } } @@ -95,7 +89,15 @@ public interface MessageSource<C extends Consumer, S extends MessageSource<C,S>> { public ExistingConsumerPreventsExclusive() { - super(""); } } + + static final class ConsumerAccessRefused extends Exception + { + public ConsumerAccessRefused() + { + } + } + + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java new file mode 100644 index 0000000000..a559a69ab5 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public enum ExclusivityPolicy +{ + NONE, + SESSION, + CONNECTION, + CONTAINER, + PRINCIPAL, + LINK +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java index c9006f4e71..1e45f8f493 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java @@ -23,5 +23,9 @@ package org.apache.qpid.server.model; public enum LifetimePolicy { PERMANENT, - AUTO_DELETE + DELETE_ON_CONNECTION_CLOSE, + DELETE_ON_SESSION_END, + DELETE_ON_NO_OUTBOUND_LINKS, + DELETE_ON_NO_LINKS, + IN_USE } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index f0241f8b30..dd3e9e4b61 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -160,7 +160,7 @@ public interface VirtualHost extends ConfiguredObject QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, CONFIG_PATH)); - int CURRENT_CONFIG_VERSION = 3; + int CURRENT_CONFIG_VERSION = 4; //children Collection<VirtualHostAlias> getAliases(); @@ -172,9 +172,8 @@ public interface VirtualHost extends ConfiguredObject LifetimePolicy lifetime, long ttl, String type, Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException; - Queue createQueue(String name, State initialState, boolean durable, - boolean exclusive, LifetimePolicy lifetime, long ttl, Map<String, Object> attributes) - throws AccessControlException, IllegalArgumentException; + Queue createQueue(Map<String, Object> attributes) + throws AccessControlException, IllegalArgumentException; Collection<String> getExchangeTypes(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java index 88475809c9..265d4318f1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java @@ -170,7 +170,7 @@ final class BindingAdapter extends AbstractAdapter implements Binding } else if(LIFETIME_POLICY.equals(name)) { - return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE || _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _queue.getLifetimePolicy() != LifetimePolicy.PERMANENT || _exchange.getLifetimePolicy() != LifetimePolicy.PERMANENT ? LifetimePolicy.IN_USE : LifetimePolicy.PERMANENT; } else if(TIME_TO_LIVE.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index dda23f1cfc..3cb6493338 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.model.adapter; import java.security.AccessControlException; +import java.security.Principal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -191,7 +192,8 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection } else if(name.equals(PRINCIPAL)) { - return _connection.getPrincipalAsString(); + final Principal authorizedPrincipal = _connection.getAuthorizedPrincipal(); + return authorizedPrincipal == null ? null : authorizedPrincipal.getName(); } else if(name.equals(PROPERTIES)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index cf6874030b..7935077a40 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -130,7 +130,7 @@ public class ConsumerAdapter extends AbstractAdapter implements org.apache.qpid. } else if(LIFETIME_POLICY.equals(name)) { - return LifetimePolicy.AUTO_DELETE; + return LifetimePolicy.DELETE_ON_SESSION_END; } else if(TIME_TO_LIVE.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index f7ecbd323a..d7b6b8bb75 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -201,7 +201,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa public LifetimePolicy getLifetimePolicy() { - return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; } public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired) @@ -330,7 +330,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa } else if(LIFETIME_POLICY.equals(name)) { - return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; } else if(TIME_TO_LIVE.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 733b6cecc2..5d09cfa8e2 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -31,15 +31,7 @@ import java.util.Map; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.ConfiguredObjectFinder; -import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.IllegalStateTransitionException; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.QueueNotificationListener; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.*; @@ -62,7 +54,6 @@ final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter impl put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class); put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class); put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); - put(EXCLUSIVE, Boolean.class); put(DESCRIPTION, String.class); }}); @@ -208,7 +199,7 @@ final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter impl public LifetimePolicy getLifetimePolicy() { - return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _queue.getLifetimePolicy(); } public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired) @@ -274,9 +265,33 @@ final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter impl } else if(EXCLUSIVE.equals(name)) { - Boolean exclusiveFlag = (Boolean) desired; - _queue.setExclusive(exclusiveFlag); + ExclusivityPolicy desiredPolicy; + if(desired == null) + { + desiredPolicy = ExclusivityPolicy.NONE; + } + else if(desired instanceof ExclusivityPolicy) + { + desiredPolicy = (ExclusivityPolicy)desired; + } + else if (desired instanceof String) + { + desiredPolicy = ExclusivityPolicy.valueOf((String)desired); + } + else + { + throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName()); + } + try + { + _queue.setExclusivityPolicy(desiredPolicy); + } + catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive) + { + throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this"); + } return true; + } else if(MESSAGE_GROUP_KEY.equals(name)) { @@ -376,7 +391,7 @@ final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter impl } else if(EXCLUSIVE.equals(name)) { - return _queue.isExclusive(); + return _queue.getAttribute(Queue.EXCLUSIVE); } else if(MESSAGE_GROUP_KEY.equals(name)) { @@ -458,7 +473,7 @@ final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter impl } else if(LIFETIME_POLICY.equals(name)) { - return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _queue.getLifetimePolicy(); } else if(NAME.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 17ce69da4a..4cd7432f75 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.model.adapter; import java.io.File; import java.lang.reflect.Type; import java.security.AccessControlException; -import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -58,18 +57,16 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueType; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.ConflationQueue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.LocalTransaction; @@ -356,7 +353,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual name, type, durable, - lifetime == LifetimePolicy.AUTO_DELETE, + lifetime != null && lifetime != LifetimePolicy.PERMANENT, alternateExchange); synchronized (_exchangeAdapters) { @@ -389,7 +386,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual public Queue createQueue(Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException { - attributes = new HashMap<String, Object>(attributes); + checkVHostStateIsActive(); if (attributes.containsKey(Queue.QUEUE_TYPE)) { @@ -405,7 +402,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null) { - attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); + attributes.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); } else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null) { @@ -417,51 +414,12 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } - String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null); - State state = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE); - boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); - LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT); - long ttl = MapValueConverter.getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l); - boolean exclusive= MapValueConverter.getBooleanAttribute(Queue.EXCLUSIVE, attributes, false); - - attributes.remove(Queue.NAME); - attributes.remove(Queue.STATE); - attributes.remove(Queue.DURABLE); - attributes.remove(Queue.LIFETIME_POLICY); - attributes.remove(Queue.TIME_TO_LIVE); - - return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes); - } - - public Queue createQueue(final String name, - final State initialState, - final boolean durable, - boolean exclusive, - final LifetimePolicy lifetime, - final long ttl, - final Map<String, Object> attributes) - throws AccessControlException, IllegalArgumentException - { - checkVHostStateIsActive(); - - String owner = null; - if(exclusive) - { - Principal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(SecurityManager.getThreadSubject()); - if(authenticatedPrincipal != null) - { - owner = authenticatedPrincipal.getName(); - } - } - final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE; try { - AMQQueue queue = - _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, - durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes); + AMQQueue queue = _virtualHost.createQueue(null, attributes); synchronized (_queueAdapters) { @@ -471,15 +429,15 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } catch(QueueExistsException qe) { - throw new IllegalArgumentException("Queue with name "+name+" already exists"); + throw new IllegalArgumentException("Queue with name "+MapValueConverter.getStringAttribute(Queue.NAME,attributes)+" already exists"); } catch (QpidSecurityException e) { throw new AccessControlException(e.toString()); } - } + public String getName() { return (String)getAttribute(NAME); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index bc272f13dc..623cf62472 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -25,11 +25,13 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.util.Deletable; +import java.security.Principal; import java.util.List; import java.util.UUID; -public interface AMQConnectionModel extends StatisticsGatherer +public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends AMQSessionModel<S,T>> extends StatisticsGatherer, Deletable<T> { /** * Close the underlying Connection @@ -50,7 +52,7 @@ public interface AMQConnectionModel extends StatisticsGatherer * @param cause * @param message */ - public void closeSession(AMQSessionModel session, AMQConstant cause, String message); + public void closeSession(S session, AMQConstant cause, String message); public long getConnectionId(); @@ -59,15 +61,13 @@ public interface AMQConnectionModel extends StatisticsGatherer * * @return a list of {@link AMQSessionModel}s */ - public List<AMQSessionModel> getSessionModels(); + public List<S> getSessionModels(); /** * Return a {@link LogSubject} for the connection. */ public LogSubject getLogSubject(); - public String getUserName(); - public boolean isSessionNameUnique(byte[] name); String getRemoteAddressString(); @@ -78,7 +78,7 @@ public interface AMQConnectionModel extends StatisticsGatherer String getClientProduct(); - String getPrincipalAsString(); + Principal getAuthorizedPrincipal(); long getSessionCountLimit(); @@ -93,4 +93,6 @@ public interface AMQConnectionModel extends StatisticsGatherer boolean isStopped(); String getVirtualHostName(); + + String getRemoteContainerName(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 520489ab1f..db92f6950d 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -26,17 +26,18 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.util.Deletable; /** * Session model interface. * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet} * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}. */ -public interface AMQSessionModel extends Comparable<AMQSessionModel> +public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<T>, Deletable<T> { public UUID getId(); - public AMQConnectionModel getConnectionModel(); + public C getConnectionModel(); public String getClientID(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 237f2cd8aa..aceebc722b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -27,10 +27,13 @@ import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -38,9 +41,12 @@ import java.util.List; import java.util.Set; public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> - extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination + extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination, + Deletable<Q> { + void setExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive; + public interface NotificationListener { void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg); @@ -66,9 +72,7 @@ public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C> long getTotalEnqueueCount(); - void setNoLocal(boolean b); - - boolean isAutoDelete(); + LifetimePolicy getLifetimePolicy(); String getOwner(); @@ -104,11 +108,6 @@ public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C> boolean resend(final E entry, final C consumer); - void addQueueDeleteTask(Action<AMQQueue> task); - void removeQueueDeleteTask(Action<AMQQueue> task); - - - List<E> getMessagesOnTheQueue(); List<Long> getMessagesOnTheQueue(int num); @@ -189,10 +188,6 @@ public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C> Collection<String> getAvailableAttributes(); Object getAttribute(String attrName); - void configure(QueueConfiguration config); - - void setExclusive(boolean exclusive); - /** * Gets the maximum delivery count. If a message on this queue * is delivered more than maximumDeliveryCount, the message will be diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 409c528a4b..5003db1385 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,12 +20,14 @@ */ package org.apache.qpid.server.queue; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; @@ -36,6 +38,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; @@ -45,7 +48,6 @@ import org.apache.qpid.server.virtualhost.QueueExistsException; public class AMQQueueFactory implements QueueFactory { - public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key"; public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; @@ -59,381 +61,207 @@ public class AMQQueueFactory implements QueueFactory { _virtualHost = virtualHost; _queueRegistry = queueRegistry; + } + + @Override + public AMQQueue restoreQueue(Map<String, Object> attributes) throws QpidSecurityException + { + return createOrRestoreQueue(null, attributes, false); + } - private abstract static class QueueProperty + @Override + public AMQQueue createQueue(final AMQSessionModel creatingSession, + Map<String, Object> attributes) throws QpidSecurityException + { + return createOrRestoreQueue(creatingSession, attributes, true); + } + + private AMQQueue createOrRestoreQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes, + boolean createInStore) throws QpidSecurityException { - private final String _argumentName; + String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); - public QueueProperty(String argumentName) + QueueConfiguration config = _virtualHost.getConfiguration().getQueueConfiguration(queueName); + + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE) && config.getMaximumMessageAge() != 0) { - _argumentName = argumentName; + attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, config.getMaximumMessageAge()); } - - public String getArgumentName() + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) && config.getMaximumQueueDepth() != 0) { - return _argumentName; + attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, config.getMaximumQueueDepth()); } - - - public abstract void setPropertyValue(AMQQueue queue, Object value); - - } - - private abstract static class QueueLongProperty extends QueueProperty - { - - public QueueLongProperty(String argumentName) + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) && config.getMaximumMessageSize() != 0) { - super(argumentName); + attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, config.getMaximumMessageSize()); } - - public void setPropertyValue(AMQQueue queue, Object value) + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) && config.getMaximumMessageCount() != 0) { - if(value instanceof Number) - { - setPropertyValue(queue, ((Number)value).longValue()); - } - + attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, config.getMaximumMessageCount()); } - - abstract void setPropertyValue(AMQQueue queue, long value); - - - } - - private abstract static class QueueIntegerProperty extends QueueProperty - { - public QueueIntegerProperty(String argumentName) + if (!attributes.containsKey(Queue.ALERT_REPEAT_GAP) && config.getMinimumAlertRepeatGap() != 0) { - super(argumentName); + attributes.put(Queue.ALERT_REPEAT_GAP, config.getMinimumAlertRepeatGap()); } - - public void setPropertyValue(AMQQueue queue, Object value) + if (config.getMaxDeliveryCount() != 0 && !attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) { - if(value instanceof Number) - { - setPropertyValue(queue, ((Number)value).intValue()); - } - + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, config.getMaxDeliveryCount()); } - abstract void setPropertyValue(AMQQueue queue, int value); - } - - private static final QueueProperty[] DECLARABLE_PROPERTIES = { - new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageAge(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageSize(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageCount(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumQueueDepth(value); - } - }, - new QueueLongProperty(Queue.ALERT_REPEAT_GAP) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMinimumAlertRepeatGap(value); - } - }, - new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setCapacity(value); - } - }, - new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setFlowResumeCapacity(value); - } - }, - new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS) - { - public void setPropertyValue(AMQQueue queue, int value) - { - queue.setMaximumDeliveryCount(value); - } - } - }; - - @Override - public AMQQueue restoreQueue(UUID id, - String queueName, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException - { - return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false); - - } - - /** - * @param id the id to use. - * @param deleteOnNoConsumer - */ - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException - { - return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true); - } - - private AMQQueue createOrRestoreQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments, - boolean createInStore) throws QpidSecurityException - { - if (id == null) + if (!attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) && config.getCapacity() != 0) { - throw new IllegalArgumentException("Queue id must not be null"); + attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, config.getCapacity()); } - if (queueName == null) + if (!attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) && config.getFlowResumeCapacity() != 0) { - throw new IllegalArgumentException("Queue name must not be null"); + attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, config.getFlowResumeCapacity()); } - QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName); - - boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration); + boolean createDLQ = createDLQ(attributes, config); if (createDLQ) { validateDLNames(queueName); } - int priorities = 1; - String conflationKey = null; - String sortingKey = null; - - if(arguments != null) - { - if(arguments.containsKey(Queue.LVQ_KEY)) - { - conflationKey = (String) arguments.get(Queue.LVQ_KEY); - if(conflationKey == null) - { - conflationKey = QPID_DEFAULT_LVQ_KEY; - } - } - else if(arguments.containsKey(Queue.PRIORITIES)) - { - Object prioritiesObj = arguments.get(Queue.PRIORITIES); - if(prioritiesObj instanceof Number) - { - priorities = ((Number)prioritiesObj).intValue(); - } - else if(prioritiesObj instanceof String) - { - try - { - priorities = Integer.parseInt(prioritiesObj.toString()); - } - catch (NumberFormatException e) - { - // TODO - should warn here of invalid format - } - } - else - { - // TODO - should warn here of invalid format - } - } - else if(arguments.containsKey(Queue.SORT_KEY)) - { - sortingKey = (String)arguments.get(Queue.SORT_KEY); - } - } + AMQQueue queue; - AMQQueue q; - if(sortingKey != null) + if(attributes.containsKey(Queue.SORT_KEY)) { - q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey); + queue = new SortedQueue(_virtualHost, creatingSession, attributes); } - else if(conflationKey != null) + else if(attributes.containsKey(Queue.LVQ_KEY)) { - q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey); + queue = new ConflationQueue(_virtualHost, creatingSession, attributes); } - else if(priorities > 1) + else if(attributes.containsKey(Queue.PRIORITIES)) { - q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); + queue = new PriorityQueue(_virtualHost, creatingSession, attributes); } else { - q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); + queue = new StandardQueue(_virtualHost, creatingSession, attributes); } - q.setDeleteOnNoConsumers(deleteOnNoConsumer); - //Register the new queue - _queueRegistry.registerQueue(q); + _queueRegistry.registerQueue(queue); - q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName)); - - if(arguments != null) + if(createDLQ) { - for(QueueProperty p : DECLARABLE_PROPERTIES) - { - if(arguments.containsKey(p.getArgumentName())) - { - p.setPropertyValue(q, arguments.get(p.getArgumentName())); - } - } - - if(arguments.get(Queue.NO_LOCAL) instanceof Boolean) - { - q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL)); - } - + createDLQ(queue); } - - if(createDLQ) + else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) { - final String dlExchangeName = getDeadLetterExchangeName(queueName); - final String dlQueueName = getDeadLetterQueueName(queueName); - - Exchange dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); + final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); + Exchange altExchange; try { - dlExchange = _virtualHost.createExchange(dlExchangeId, - dlExchangeName, - ExchangeDefaults.FANOUT_EXCHANGE_CLASS, - true, false, null); - } - catch(ExchangeExistsException e) - { - // We're ok if the exchange already exists - dlExchange = e.getExistingExchange(); - } - catch (ReservedExchangeNameException e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - catch (AMQUnknownExchangeType e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); } - catch (UnknownExchangeException e) + catch(IllegalArgumentException e) { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + altExchange = _virtualHost.getExchange(altExchangeAttr); } + queue.setAlternateExchange(altExchange); + } - AMQQueue dlQueue = null; + if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue); + } - synchronized(_queueRegistry) - { - dlQueue = _queueRegistry.getQueue(dlQueueName); + return queue; + } - if(dlQueue == null) - { - //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc - final Map<String, Object> args = new HashMap<String, Object>(); - args.put(Queue.CREATE_DLQ_ON_CREATION, false); - args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); - - try - { - dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive, - false, args); - } - catch (QueueExistsException e) - { - throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + - "queue already exists, however this occurred within " + - "a block where the queue existence had previously been " + - "checked, and no queue creation should have been " + - "possible from another thread", e); - } - } - } + private void createDLQ(final AMQQueue queue) throws QpidSecurityException + { + final String queueName = queue.getName(); + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); - //ensure the queue is bound to the exchange - if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) - { - //actual routing key used does not matter due to use of fanout exchange, - //but we will make the key 'dlq' as it can be logged at creation. - dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); - } - q.setAlternateExchange(dlExchange); + Exchange dlExchange = null; + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); + + try + { + dlExchange = _virtualHost.createExchange(dlExchangeId, + dlExchangeName, + ExchangeDefaults.FANOUT_EXCHANGE_CLASS, + true, false, null); + } + catch(ExchangeExistsException e) + { + // We're ok if the exchange already exists + dlExchange = e.getExistingExchange(); + } + catch (ReservedExchangeNameException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); } - else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + catch (AMQUnknownExchangeType e) { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + catch (UnknownExchangeException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } - final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE); - Exchange altExchange; - try - { - altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); - } - catch(IllegalArgumentException e) + AMQQueue dlQueue = null; + + synchronized(_queueRegistry) + { + dlQueue = _queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) { - altExchange = _virtualHost.getExchange(altExchangeAttr); + //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc + final Map<String, Object> args = new HashMap<String, Object>(); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); + + try + { + + + args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName())); + args.put(Queue.NAME, dlQueueName); + args.put(Queue.DURABLE, true); + dlQueue = _virtualHost.createQueue(null, args); + } + catch (QueueExistsException e) + { + throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + + "queue already exists, however this occurred within " + + "a block where the queue existence had previously been " + + "checked, and no queue creation should have been " + + "possible from another thread", e); + } } - q.setAlternateExchange(altExchange); } - if (createInStore && q.isDurable() && !q.isAutoDelete()) + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) { - DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q); + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); } - - return q; + queue.setAlternateExchange(dlExchange); } public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws QpidSecurityException { - String queueName = config.getName(); - - boolean durable = config.getDurable(); - boolean autodelete = config.getAutoDelete(); - boolean exclusive = config.getExclusive(); - String owner = config.getOwner(); - Map<String, Object> arguments = createQueueArgumentsFromConfig(config); - - // we need queues that are defined in config to have deterministic ids. - UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName()); - AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments); - q.configure(config); + Map<String, Object> arguments = createQueueAttributesFromConfig(_virtualHost, config); + + AMQQueue q = createOrRestoreQueue(null, arguments, false); return q; } @@ -471,16 +299,19 @@ public class AMQQueueFactory implements QueueFactory /** * Checks if DLQ is enabled for the queue. * - * @param autoDelete - * queue auto-delete flag * @param arguments * queue arguments * @param qConfig * queue configuration * @return true if DLQ enabled */ - protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + protected static boolean createDLQ(Map<String, Object> arguments, QueueConfiguration qConfig) { + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + arguments, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + //feature is not to be enabled for temporary queues or when explicitly disabled by argument if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) { @@ -525,46 +356,59 @@ public class AMQQueueFactory implements QueueFactory return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); } - private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config) + private static Map<String, Object> createQueueAttributesFromConfig(final VirtualHost virtualHost, + QueueConfiguration config) { - Map<String,Object> arguments = new HashMap<String,Object>(); + Map<String,Object> attributes = new HashMap<String,Object>(); if(config.getArguments() != null && !config.getArguments().isEmpty()) { - arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments()))); + attributes.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments()))); } if(config.isLVQ() || config.getLVQKey() != null) { - arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey()); + attributes.put(Queue.LVQ_KEY, + config.getLVQKey() == null ? ConflationQueue.DEFAULT_LVQ_KEY : config.getLVQKey()); } else if (config.getPriority() || config.getPriorities() > 0) { - arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + attributes.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); } else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) { - arguments.put(Queue.SORT_KEY, config.getQueueSortKey()); + attributes.put(Queue.SORT_KEY, config.getQueueSortKey()); } if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) { - arguments.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); } if (config.getDescription() != null && !"".equals(config.getDescription())) { - arguments.put(Queue.DESCRIPTION, config.getDescription()); + attributes.put(Queue.DESCRIPTION, config.getDescription()); } - if (arguments.isEmpty()) + attributes.put(Queue.DURABLE, config.getDurable()); + attributes.put(Queue.LIFETIME_POLICY, + config.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT); + if(config.getExclusive()) { - return Collections.emptyMap(); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER); } - else + if(config.getOwner() != null) { - return arguments; + attributes.put(Queue.OWNER, config.getOwner()); } + + attributes.put(Queue.NAME, config.getName()); + + // we need queues that are defined in config to have deterministic ids. + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(config.getName(), virtualHost.getName())); + + + return attributes; } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index a1ff51959c..f350368634 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -22,22 +22,32 @@ package org.apache.qpid.server.queue; import java.util.Map; -import java.util.UUID; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; public class ConflationQueue extends SimpleAMQQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> { - protected ConflationQueue(UUID id, - String name, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - Map<String, Object> args, String conflationKey) + public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + + + protected ConflationQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, Map<String, Object> attributes) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); + super(virtualHost, creatingSession, attributes, entryList(attributes)); + } + + private static ConflationQueueList.Factory entryList(final Map<String, Object> attributes) + { + + String conflationKey = MapValueConverter.getStringAttribute(Queue.LVQ_KEY, + attributes, + DEFAULT_LVQ_KEY); + + // conflation key can still be null if it was present in the map with a null value + return new ConflationQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey); } public String getConflationKey() diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 7cf245c8f8..c9ac3f3621 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -20,19 +20,20 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends SimpleAMQQueue<E,Q,L> { - protected OutOfOrderQueue(UUID id, String name, boolean durable, - String owner, boolean autoDelete, boolean exclusive, - VirtualHost virtualHost, QueueEntryListFactory<E,Q,L> entryListFactory, Map<String, Object> arguments) + protected OutOfOrderQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, + Map<String, Object> attributes, + QueueEntryListFactory<E, Q, L> entryListFactory) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); + super(virtualHost, creatingSession, attributes, entryListFactory); } @Override diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java index 4440d045d1..1e41a0fb3e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java @@ -20,23 +20,31 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList> { - protected PriorityQueue(UUID id, - final String name, - final boolean durable, - final String owner, - final boolean autoDelete, - boolean exclusive, - final VirtualHost virtualHost, - Map<String, Object> arguments, int priorities) + + public static final int DEFAULT_PRIORITY_LEVELS = 10; + + protected PriorityQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, + Map<String, Object> attributes) + { + super(virtualHost, creatingSession, attributes, entryList(attributes)); + } + + private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); + final Integer priorities = MapValueConverter.getIntegerAttribute(Queue.PRIORITIES, attributes, + DEFAULT_PRIORITY_LEVELS); + + return new PriorityQueueList.Factory(priorities); } public int getPriorities() diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 589f385d22..49123f8412 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -109,7 +109,7 @@ public class QueueArgumentsConverter } if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) { - modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); + modelArguments.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); } if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java index 8c0386c7b4..62a2d93b0f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -22,25 +22,15 @@ package org.apache.qpid.server.queue; import java.util.Map; import java.util.UUID; + +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.QpidSecurityException; public interface QueueFactory { - AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, + AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QpidSecurityException; - AMQQueue restoreQueue(UUID id, - String queueName, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException; + AMQQueue restoreQueue(Map<String, Object> arguments) throws QpidSecurityException; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b2408f6dfa..aa7025e068 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.server.queue; +import java.security.Principal; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; @@ -28,11 +29,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; @@ -50,12 +54,16 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.Deletable; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -78,19 +86,10 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private final String _name; /** null means shared */ - private final String _owner; - - private AuthorizationHolder _authorizationHolder; - - private boolean _exclusive = false; - private AMQSessionModel _exclusiveOwner; - + private String _description; private final boolean _durable; - /** If true, this queue is deleted when the last subscriber is removed */ - private final boolean _autoDelete; - private Exchange _alternateExchange; @@ -142,6 +141,10 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private long _flowResumeCapacity; + private ExclusivityPolicy _exclusivityPolicy; + private LifetimePolicy _lifetimePolicy; + private Object _exclusiveOwner; // could be connection, session or Principal + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); @@ -157,7 +160,8 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>(); + private final List<Action<? super Q>> _deleteTaskList = + new CopyOnWriteArrayList<Action<? super Q>>(); private LogSubject _logSubject; @@ -184,16 +188,98 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - - protected SimpleAMQQueue(UUID id, - String name, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments) + protected SimpleAMQQueue(VirtualHost virtualHost, + final AMQSessionModel<?,?> creatingSession, + Map<String, Object> attributes, + QueueEntryListFactory<E, Q, L> entryListFactory) { + UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); + String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false); + + + _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class, + Queue.EXCLUSIVE, + attributes, + ExclusivityPolicy.NONE); + _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT); + if(creatingSession != null) + { + + switch(_exclusivityPolicy) + { + + case PRINCIPAL: + _exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal(); + break; + case CONTAINER: + _exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName(); + break; + case CONNECTION: + _exclusiveOwner = creatingSession.getConnectionModel(); + addExclusivityConstraint(creatingSession.getConnectionModel()); + break; + case SESSION: + _exclusiveOwner = creatingSession; + addExclusivityConstraint(creatingSession); + break; + case NONE: + case LINK: + // nothing to do as if link no link associated until there is a consumer associated + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy: " + + _exclusivityPolicy + + " this is a coding error inside Qpid"); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = new AuthenticatedPrincipal(owner); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = owner; + } + } + + + if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE) + { + if(creatingSession != null) + { + addLifetimeConstraint(creatingSession.getConnectionModel()); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } + else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END) + { + if(creatingSession != null) + { + addLifetimeConstraint(creatingSession); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } if (name == null) { @@ -207,12 +293,18 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA _name = name; _durable = durable; - _owner = owner; - _autoDelete = autoDelete; - _exclusive = exclusive; _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList((Q)this); - _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments)); + _entries = entryListFactory.createQueueEntryList((Q) this); + final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes); + + arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); + arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); + + _arguments = Collections.synchronizedMap(arguments); + _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null); + + _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); + _id = id; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); @@ -220,30 +312,113 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA _logSubject = new QueueLogSubject(this); _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); + + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE)) + { + setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes)); + } + else + { + setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)) + { + setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes)); + } + else + { + setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)) + { + setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + attributes)); + } + else + { + setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)) + { + setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + attributes)); + } + else + { + setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes()); + } + if (attributes.containsKey(Queue.ALERT_REPEAT_GAP)) + { + setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes)); + } + else + { + setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)) + { + setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes)); + } + else + { + setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)) + { + setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes)); + } + else + { + setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes()); + } + if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) + { + setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes)); + } + else + { + setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts()); + } + + final String ownerString; + switch(_exclusivityPolicy) + { + case PRINCIPAL: + ownerString = ((Principal) _exclusiveOwner).getName(); + break; + case CONTAINER: + ownerString = (String) _exclusiveOwner; + break; + default: + ownerString = null; + + } + // Log the creation of this Queue. // The priorities display is toggled on if we set priorities > 0 CurrentActor.get().message(_logSubject, - QueueMessages.CREATED(String.valueOf(_owner), + QueueMessages.CREATED(ownerString, _entries.getPriorities(), - _owner != null, - autoDelete, - durable, !durable, + ownerString != null , + _lifetimePolicy != LifetimePolicy.PERMANENT, + durable, + !durable, _entries.getPriorities() > 0)); - if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY)) + if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) { - if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null - && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null + && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) { - Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); + Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), + new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)), defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else { - _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -256,6 +431,38 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } + private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject) + { + final Action<Deletable> deleteQueueTask = new Action<Deletable>() + { + @Override + public void performAction(final Deletable object) + { + try + { + getVirtualHost().removeQueue(SimpleAMQQueue.this); + } + catch (QpidSecurityException e) + { + throw new ConnectionScopedRuntimeException("Unable to delete a queue even though the queue's " + + "lifetime was tied to an object being deleted"); + } + } + }; + + lifetimeObject.addDeleteTask(deleteQueueTask); + addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask)); + } + + private void addExclusivityConstraint(final Deletable<? extends Deletable> lifetimeObject) + { + final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject); + final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction); + clearOwnerAction.setDeleteTask(deleteDeleteTask); + lifetimeObject.addDeleteTask(clearOwnerAction); + addDeleteTask(deleteDeleteTask); + } + public void resetNotifications() { // This ensure that the notification checks for the configured alerts are created. @@ -303,12 +510,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA public boolean isExclusive() { - return _exclusive; - } - - public void setExclusive(boolean exclusive) - { - _exclusive = exclusive; + return _exclusivityPolicy != ExclusivityPolicy.NONE; } public Exchange getAlternateExchange() @@ -342,27 +544,27 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA return _arguments.get(attrName); } - public boolean isAutoDelete() + @Override + public LifetimePolicy getLifetimePolicy() { - return _autoDelete; + return _lifetimePolicy; } public String getOwner() { - return _owner; - } - - public AuthorizationHolder getAuthorizationHolder() - { - return _authorizationHolder; - } - - public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) - { - _authorizationHolder = authorizationHolder; + if(_exclusiveOwner != null) + { + switch(_exclusivityPolicy) + { + case CONTAINER: + return (String) _exclusiveOwner; + case PRINCIPAL: + return ((Principal)_exclusiveOwner).getName(); + } + } + return null; } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -381,7 +583,9 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<Consumer.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException + EnumSet<Consumer.Option> optionSet) + throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException, + ConsumerAccessRefused { // Access control @@ -396,15 +600,77 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA throw new ExistingExclusiveConsumer(); } + switch(_exclusivityPolicy) + { + case CONNECTION: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel(); + addExclusivityConstraint(target.getSessionModel().getConnectionModel()); + } + else + { + if(_exclusiveOwner != target.getSessionModel().getConnectionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case SESSION: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel(); + addExclusivityConstraint(target.getSessionModel()); + } + else + { + if(_exclusiveOwner != target.getSessionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case LINK: + if(getConsumerCount() != 0) + { + throw new ConsumerAccessRefused(); + } + break; + case PRINCIPAL: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else + { + if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case CONTAINER: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else + { + if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case NONE: + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); - if (exclusive && !isTransient && getConsumerCount() != 0) - { - throw new ExistingConsumerPreventsExclusive(); - } - QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass, optionSet.contains(Consumer.Option.ACQUIRES), optionSet.contains(Consumer.Option.SEES_REQUEUES), @@ -473,11 +739,12 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA consumer.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); + consumer.setQueueContext(null); - if(!isDeleted() && isExclusive() && getConsumerCount() == 0) + if(_exclusivityPolicy == ExclusivityPolicy.LINK) { - setAuthorizationHolder(null); + _exclusiveOwner = null; } if(_messageGroupManager != null) @@ -495,8 +762,12 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 ) + if(!consumer.isTransient() + && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS ) + && getConsumerCount() == 0) { + if (_logger.isInfoEnabled()) { _logger.info("Auto-deleting queue:" + this); @@ -1266,12 +1537,14 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA }); } - public void addQueueDeleteTask(final Action<AMQQueue> task) + @Override + public void addDeleteTask(final Action<? super Q> task) { _deleteTaskList.add(task); } - public void removeQueueDeleteTask(final Action<AMQQueue> task) + @Override + public void removeDeleteTask(final Action<? super Q> task) { _deleteTaskList.remove(task); } @@ -1343,9 +1616,9 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } - for (Action<AMQQueue> task : _deleteTaskList) + for (Action<? super Q> task : _deleteTaskList) { - task.performAction(this); + task.performAction((Q)this); } _deleteTaskList.clear(); @@ -1940,6 +2213,26 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA return _notificationChecks; } + private static class DeleteDeleteTask implements Action<Deletable> + { + + private final Deletable<? extends Deletable> _lifetimeObject; + private final Action<? super Deletable> _deleteQueueOwnerTask; + + public DeleteDeleteTask(final Deletable<? extends Deletable> lifetimeObject, + final Action<? super Deletable> deleteQueueOwnerTask) + { + _lifetimeObject = lifetimeObject; + _deleteQueueOwnerTask = deleteQueueOwnerTask; + } + + @Override + public void performAction(final Deletable object) + { + _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask); + } + } + private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State> { @@ -1990,38 +2283,6 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA return ids; } - public AMQSessionModel getExclusiveOwningSession() - { - return _exclusiveOwner; - } - - public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) - { - _exclusive = true; - _exclusiveOwner = exclusiveOwner; - } - - - public void configure(QueueConfiguration config) - { - if (config != null) - { - setMaximumMessageAge(config.getMaximumMessageAge()); - setMaximumQueueDepth(config.getMaximumQueueDepth()); - setMaximumMessageSize(config.getMaximumMessageSize()); - setMaximumMessageCount(config.getMaximumMessageCount()); - setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); - setMaximumDeliveryCount(config.getMaxDeliveryCount()); - _capacity = config.getCapacity(); - _flowResumeCapacity = config.getFlowResumeCapacity(); - } - } - - public long getMessageDequeueCount() - { - return _dequeueCount.get(); - } - public long getTotalEnqueueSize() { return _enqueueSize.get(); @@ -2130,20 +2391,13 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA @Override public void setDescription(String description) { - if (description == null) - { - _arguments.remove(Queue.DESCRIPTION); - } - else - { - _arguments.put(Queue.DESCRIPTION, description); - } + _description = description; } @Override public String getDescription() { - return (String) _arguments.get(Queue.DESCRIPTION); + return _description; } public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, @@ -2176,4 +2430,228 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } + @Override + public boolean verifySessionAccess(final AMQSessionModel<?, ?> session) + { + boolean allowed; + switch(_exclusivityPolicy) + { + case NONE: + allowed = true; + break; + case SESSION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session; + break; + case CONNECTION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel(); + break; + case PRINCIPAL: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal()); + break; + case CONTAINER: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName()); + break; + case LINK: + allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session; + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } + return allowed; + } + + @Override + public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy) + throws ExistingConsumerPreventsExclusive + { + if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE)) + { + switch(desiredPolicy) + { + case NONE: + _exclusiveOwner = null; + break; + case PRINCIPAL: + switchToPrincipalExclusivity(); + break; + case CONTAINER: + switchToContainerExclusivity(); + break; + case CONNECTION: + switchToConnectionExclusivity(); + break; + case SESSION: + switchToSessionExclusivity(); + break; + case LINK: + switchToLinkExclusivity(); + break; + } + _exclusivityPolicy = desiredPolicy; + } + } + + private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive + { + switch (getConsumerCount()) + { + case 1: + _exclusiveSubscriber = getConsumerList().getHead().getConsumer(); + // deliberate fall through + case 0: + _exclusiveOwner = null; + break; + default: + throw new ExistingConsumerPreventsExclusive(); + } + + } + + private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive + { + + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + case CONTAINER: + case CONNECTION: + AMQSessionModel session = null; + for(Consumer c : getConsumers()) + { + if(session == null) + { + session = c.getSessionModel(); + } + else if(!session.equals(c.getSessionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = session; + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + case PRINCIPAL: + AMQConnectionModel con = null; + for(Consumer c : getConsumers()) + { + if(con == null) + { + con = c.getSessionModel().getConnectionModel(); + } + else if(!con.equals(c.getSessionModel().getConnectionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = con; + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + String containerID = null; + for(Consumer c : getConsumers()) + { + if(containerID == null) + { + containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = containerID; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + } + + private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + Principal principal = null; + for(Consumer c : getConsumers()) + { + if(principal == null) + { + principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = principal; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + } + + private class ClearOwnerAction implements Action<Deletable> + { + private final Deletable<? extends Deletable> _lifetimeObject; + private DeleteDeleteTask _deleteTask; + + public ClearOwnerAction(final Deletable<? extends Deletable> lifetimeObject) + { + _lifetimeObject = lifetimeObject; + } + + @Override + public void performAction(final Deletable object) + { + if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject) + { + SimpleAMQQueue.this._exclusiveOwner = null; + } + if(_deleteTask != null) + { + removeDeleteTask(_deleteTask); + } + } + + public void setDeleteTask(final DeleteDeleteTask deleteTask) + { + _deleteTask = deleteTask; + } + } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index f85f9479bb..d1da7feddc 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -21,11 +21,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList> { @@ -35,28 +37,26 @@ public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, private final Object _sortedQueueLock = new Object(); private final String _sortedPropertyName; - protected SortedQueue(UUID id, final String name, - final boolean durable, final String owner, final boolean autoDelete, - final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) + protected SortedQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, + Map<String, Object> attributes, + QueueEntryListFactory<SortedQueueEntry, SortedQueue, SortedQueueEntryList> factory) { - this(id, name, durable, owner, autoDelete, exclusive, - virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName)); + super(virtualHost, creatingSession, attributes, factory); + _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes); } - protected SortedQueue(UUID id, final String name, - final boolean durable, final String owner, final boolean autoDelete, - final boolean exclusive, final VirtualHost virtualHost, - Map<String, Object> arguments, - String sortedPropertyName, - QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory) + protected SortedQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, Map<String, Object> attributes) { - super(id, name, durable, owner, autoDelete, exclusive, - virtualHost, factory, arguments); - this._sortedPropertyName = sortedPropertyName; + this(virtualHost, + creatingSession, attributes, + new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes))); } + public String getSortedPropertyName() { return _sortedPropertyName; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java index e2ccdc45cf..8a0570eb4e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java @@ -20,22 +20,16 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public class StandardQueue extends SimpleAMQQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList> { - public StandardQueue(final UUID id, - final String name, - final boolean durable, - final String owner, - final boolean autoDelete, - final boolean exclusive, - final VirtualHost virtualHost, - final Map<String, Object> arguments) + public StandardQueue(final VirtualHost virtualHost, + final AMQSessionModel creatingSession, final Map<String, Object> arguments) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new StandardQueueEntryList.Factory(), arguments); + super(virtualHost, creatingSession, arguments, new StandardQueueEntryList.Factory()); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java index a379f85bbb..b9e0e5920f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.queue.AMQQueue; /** @@ -139,8 +140,8 @@ public class ObjectProperties { setName(queue.getName()); - put(Property.AUTO_DELETE, queue.isAutoDelete()); - put(Property.TEMPORARY, queue.isAutoDelete()); + put(Property.AUTO_DELETE, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); + put(Property.TEMPORARY, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); put(Property.DURABLE, queue.isDurable()); put(Property.EXCLUSIVE, queue.isExclusive()); if (queue.getAlternateExchange() != null) @@ -151,10 +152,7 @@ public class ObjectProperties { put(Property.OWNER, queue.getOwner()); } - else if (queue.getAuthorizationHolder() != null) - { - put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName()); - } + } public ObjectProperties(Exchange exch, AMQQueue queue, String routingKey) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index fc5806ccce..096734ff73 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -42,17 +42,11 @@ public class DurableConfigurationStoreHelper private static final String BINDING = Binding.class.getSimpleName(); private static final String EXCHANGE = Exchange.class.getSimpleName(); private static final String QUEUE = Queue.class.getSimpleName(); - private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.NAME, - Queue.OWNER, - Queue.EXCLUSIVE, - Queue.ALTERNATE_EXCHANGE)); + private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.ALTERNATE_EXCHANGE)); public static void updateQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue) { Map<String, Object> attributesMap = new LinkedHashMap<String, Object>(); - attributesMap.put(Queue.NAME, queue.getName()); - attributesMap.put(Queue.OWNER, queue.getOwner()); - attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); if (queue.getAlternateExchange() != null) { @@ -75,9 +69,6 @@ public class DurableConfigurationStoreHelper public static void createQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue) { Map<String, Object> attributesMap = new HashMap<String, Object>(); - attributesMap.put(Queue.NAME, queue.getName()); - attributesMap.put(Queue.OWNER, queue.getOwner()); - attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); if (queue.getAlternateExchange() != null) { attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); @@ -103,7 +94,7 @@ public class DurableConfigurationStoreHelper Map<String, Object> attributesMap = new HashMap<String, Object>(); attributesMap.put(Exchange.NAME, exchange.getName()); attributesMap.put(Exchange.TYPE, exchange.getTypeName()); - attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name() + attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name() : LifetimePolicy.PERMANENT.name()); store.create(exchange.getId(), EXCHANGE, attributesMap); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java new file mode 100644 index 0000000000..a6b16bbfeb --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public interface Deletable<T extends Deletable> +{ + void addDeleteTask(Action<? super T> task); + void removeDeleteTask(Action<? super T> task); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java index 37e0177b00..3543ce3bcf 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; public class MapValueConverter { @@ -217,6 +218,13 @@ public class MapValueConverter return getIntegerAttribute(name, attributes, null); } + public static Long getLongAttribute(String name, Map<String,Object> attributes) + { + assertMandatoryAttribute(name, attributes); + Object obj = attributes.get(name); + return toLong(name, obj, null); + } + public static Long getLongAttribute(String name, Map<String,Object> attributes, Long defaultValue) { Object obj = attributes.get(name); @@ -409,4 +417,41 @@ public class MapValueConverter return (T) value; } + + public static UUID getUUIDAttribute(String name, Map<String, Object> attributes) + { + assertMandatoryAttribute(name, attributes); + return getUUIDAttribute(name, attributes, null); + } + + public static UUID getUUIDAttribute(String name, Map<String,Object> attributes, UUID defaultVal) + { + final Object value = attributes.get(name); + return toUUID(value, defaultVal); + } + + private static UUID toUUID(final Object value, final UUID defaultVal) + { + if(value == null) + { + return defaultVal; + } + else if(value instanceof UUID) + { + return (UUID)value; + } + else if(value instanceof String) + { + return UUID.fromString((String)value); + } + else if(value instanceof byte[]) + { + return UUID.nameUUIDFromBytes((byte[])value); + } + else + { + throw new IllegalArgumentException("Cannot convert " + value.getClass().getName() + " to UUID"); + } + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index aa70bb3e8d..0e9b879316 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; @@ -69,7 +73,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener @@ -529,7 +535,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg int purged = queue.delete(); getQueueRegistry().unregisterQueue(queue.getName()); - if (queue.isDurable() && !queue.isAutoDelete()) + if (queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) { DurableConfigurationStore store = getDurableConfigurationStore(); DurableConfigurationStoreHelper.removeQueue(store, queue); @@ -538,26 +547,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } } - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException + public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException { + // make a copy as we may augment (with an ID for example) + attributes = new LinkedHashMap<String, Object>(attributes); - if (queueName == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } + String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); + ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE); + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); - // Access check + // Access check if (!getSecurityManager().authoriseCreateQueue(autoDelete, durable, - exclusive, + exclusive != null && exclusive != ExclusivityPolicy.NONE, null, null, queueName, @@ -573,22 +580,27 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); } - if(id == null) + if(!attributes.containsKey(Queue.ID)) { - id = UUIDGenerator.generateExchangeUUID(queueName, getName()); + UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName()); while(_queueRegistry.getQueue(id) != null) { id = UUID.randomUUID(); } + attributes.put(Queue.ID, id); } - else if(_queueRegistry.getQueue(id) != null) + else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null) { - throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName)); + throw new QueueExistsException("Queue with id " + + MapValueConverter.getUUIDAttribute(Queue.ID, + attributes) + + " already exists", _queueRegistry.getQueue(queueName)); } - return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, - arguments); + + + return _queueFactory.createQueue(creatingSession, attributes); } } @@ -980,13 +992,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg // house keeping task from running. } } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections()) { if (_logger.isDebugEnabled()) { _logger.debug("Checking for long running open transactions on connection " + connection); } - for (AMQSessionModel session : connection.getSessionModels()) + for (AMQSessionModel<?,?> session : connection.getSessionModels()) { if (_logger.isDebugEnabled()) { @@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { return _model; } + + } + + @Override + public long getDefaultAlertThresholdMessageAge() + { + return getConfiguration().getMaximumMessageAge(); + } + + @Override + public long getDefaultAlertThresholdMessageSize() + { + return getConfiguration().getMaximumMessageSize(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthMessages() + { + return getConfiguration().getMaximumMessageCount(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthBytes() + { + return getConfiguration().getMaximumQueueDepth(); + } + + @Override + public long getDefaultAlertRepeatGap() + { + return getConfiguration().getMinimumAlertRepeatGap(); + } + + @Override + public long getDefaultQueueFlowControlSizeBytes() + { + return getConfiguration().getCapacity(); + } + + @Override + public long getDefaultQueueFlowResumeSizeBytes() + { + return getConfiguration().getFlowResumeCapacity(); + } + + @Override + public int getDefaultMaximumDeliveryAttempts() + { + return getConfiguration().getMaxDeliveryCount(); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 12f8c7dae8..88caf73032 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -42,6 +42,7 @@ import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; public class DefaultUpgraderProvider implements UpgraderProvider { + public static final String EXCLUSIVE = "exclusive"; private final ExchangeRegistry _exchangeRegistry; private final VirtualHost _virtualHost; @@ -63,7 +64,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); case 2: currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); - + case 3: + currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader()); case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; @@ -263,4 +265,49 @@ public class DefaultUpgraderProvider implements UpgraderProvider } } + /* + * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum + * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER" + * ensure OWNER is null unless the exclusivity policy is CONTAINER + */ + private class Version3Upgrader extends NonNullUpgrader + { + + @Override + public void configuredObject(UUID id, String type, Map<String, Object> attributes) + { + if(Queue.class.getSimpleName().equals(type)) + { + Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes); + if(attributes.get(EXCLUSIVE) instanceof Boolean) + { + boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE); + newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); + if(!isExclusive && attributes.containsKey("owner")) + { + newAttributes.remove("owner"); + } + } + else + { + newAttributes.remove("owner"); + } + if(!attributes.containsKey("durable")) + { + newAttributes.put("durable","true"); + } + attributes = newAttributes; + getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes)); + } + + getNextUpgrader().configuredObject(id,type,attributes); + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index 59ff1ce6a1..c687cbda92 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -69,7 +69,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); boolean autoDelete = lifeTimePolicy == null - || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; + || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT; try { _exchange = _exchangeRegistry.getExchange(id); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index fd836fdd98..621ea02059 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -104,13 +104,6 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ public AMQQueue resolve() { String queueName = (String) _attributes.get(Queue.NAME); - String owner = (String) _attributes.get(Queue.OWNER); - boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE); - - Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes); - queueArgumentsMap.remove(Queue.NAME); - queueArgumentsMap.remove(Queue.OWNER); - queueArgumentsMap.remove(Queue.EXCLUSIVE); try { @@ -122,8 +115,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ if (_queue == null) { - _queue = _queueFactory.restoreQueue(_id, queueName, owner, false, exclusive, - false, queueArgumentsMap); + Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes); + attributes.put(Queue.ID, _id); + _queue = _queueFactory.restoreQueue(attributes); } } catch (QpidSecurityException e) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 7cd9045af8..9996684bad 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.QpidSecurityException; @@ -59,14 +60,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable int removeQueue(AMQQueue queue) throws QpidSecurityException; - AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException; + AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException; Exchange createExchange(UUID id, @@ -130,4 +124,20 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable public void block(); public void unblock(); + + long getDefaultAlertThresholdMessageAge(); + + long getDefaultAlertThresholdMessageSize(); + + long getDefaultAlertThresholdQueueDepthMessages(); + + long getDefaultAlertThresholdQueueDepthBytes(); + + long getDefaultAlertRepeatGap(); + + long getDefaultQueueFlowControlSizeBytes(); + + long getDefaultQueueFlowResumeSizeBytes(); + + int getDefaultMaximumDeliveryAttempts(); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index 96ac92a0f9..2c3420a718 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -137,9 +137,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumMessageAge()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumMessageAge()); } public void testGetMaximumQueueDepth() throws ConfigurationException @@ -153,9 +150,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumQueueDepth()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumQueueDepth()); } public void testGetMaximumMessageSize() throws ConfigurationException @@ -169,9 +163,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumMessageSize()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumMessageSize()); } public void testGetMaximumMessageCount() throws ConfigurationException @@ -185,22 +176,11 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumMessageCount()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumMessageCount()); } public void testGetMinimumAlertRepeatGap() throws Exception { - // set broker attribute ALERT_REPEAT_GAP to 10 - when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(10); - - // check that broker level setting is available on queue configuration QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); - assertEquals(10, qConf.getMinimumAlertRepeatGap()); - - // remove configuration for ALERT_REPEAT_GAP on broker level - when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(null); // Check default value qConf = new QueueConfiguration("test", _emptyConf); @@ -211,9 +191,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMinimumAlertRepeatGap()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMinimumAlertRepeatGap()); } public void testSortQueueConfiguration() throws ConfigurationException diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 5a12a411cf..1358798a38 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -35,8 +35,10 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; +import java.security.Principal; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -333,14 +335,26 @@ public class MockConsumer implements ConsumerTarget } @Override - public int compareTo(AMQSessionModel o) + public void close(AMQConstant cause, String message) { - return getId().compareTo(o.getId()); } @Override - public void close(AMQConstant cause, String message) + public void addDeleteTask(final Action task) { + + } + + @Override + public void removeDeleteTask(final Action task) + { + + } + + @Override + public int compareTo(final Object o) + { + return 0; } } @@ -431,12 +445,6 @@ public class MockConsumer implements ConsumerTarget } @Override - public String getUserName() - { - return null; - } - - @Override public boolean isSessionNameUnique(byte[] name) { return false; @@ -455,6 +463,12 @@ public class MockConsumer implements ConsumerTarget } @Override + public String getRemoteContainerName() + { + return null; + } + + @Override public String getClientVersion() { return null; @@ -467,7 +481,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public String getPrincipalAsString() + public Principal getAuthorizedPrincipal() { return null; } @@ -512,5 +526,17 @@ public class MockConsumer implements ConsumerTarget { return null; } + + @Override + public void addDeleteTask(final Action task) + { + + } + + @Override + public void removeDeleteTask(final Action task) + { + + } } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 573f4a4aaa..e8a2223b15 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -20,17 +20,23 @@ */ package org.apache.qpid.server.exchange; +import java.util.HashMap; import java.util.List; +import java.util.Map; + import junit.framework.Assert; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.QueueExistsException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -70,10 +76,17 @@ public class TopicExchangeTest extends QpidTestCase } } + private AMQQueue<?,?,?> createQueue(String name) throws QpidSecurityException, QueueExistsException + { + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, name); + return _vhost.createQueue(null, attributes); + } + public void testNoRoute() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, - false, null); + AMQQueue<?,?,?> queue = createQueue("a*#b"); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -84,8 +97,7 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, - false, null); + AMQQueue<?,?,?> queue = createQueue("ab"); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -107,7 +119,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); + AMQQueue<?,?,?> queue = createQueue("a*"); _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); @@ -138,7 +150,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); + AMQQueue<?,?,?> queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); @@ -189,8 +201,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue<?,?,?> queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); routeMessage("a.c.d.b",0l); @@ -215,8 +226,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchAfterHash() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, - false, null); + AMQQueue<?,?,?> queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); @@ -254,8 +264,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, - false, null); + AMQQueue<?,?,?> queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -276,8 +285,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws Exception { - AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, - false, null); + AMQQueue<?,?,?> queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -298,8 +306,7 @@ public class TopicExchangeTest extends QpidTestCase public void testSubMatchFails() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); int queueCount = routeMessage("a.b.c",0l); @@ -328,8 +335,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreRouting() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -342,8 +348,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreQueue() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java index 13e637542b..4d38025068 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.protocol.AMQConnectionModel; +import java.security.Principal; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,9 +45,12 @@ public class ConnectionLogSubjectTest extends AbstractTestLogSubject { super.setUp(); + final Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(USER); + _connection = mock(AMQConnectionModel.class); when(_connection.getConnectionId()).thenReturn(CONNECTION_ID); - when(_connection.getPrincipalAsString()).thenReturn(USER); + when(_connection.getAuthorizedPrincipal()).thenReturn(principal); when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING); when(_connection.getVirtualHostName()).thenReturn(VHOST); _subject = new ConnectionLogSubject(_connection); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 35ffd08863..4b6a51f84b 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.server.queue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,9 +40,10 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -98,33 +94,19 @@ public class AMQQueueFactoryTest extends QpidTestCase private void delegateVhostQueueCreation() throws Exception { - final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class); - final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class); - - when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(), - autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then( + + final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class); + + when(_virtualHost.createQueue(any(AMQSessionModel.class), attributes.capture())).then( new Answer<AMQQueue>() { @Override public AMQQueue answer(InvocationOnMock invocation) throws Throwable { - return _queueFactory.createQueue(id.getValue(), - queueName.getValue(), - durable.getValue(), - owner.getValue(), - autoDelete.getValue(), - exclusive.getValue(), - deleteOnNoConsumer.getValue(), - arguments.getValue()); + return _queueFactory.createQueue(null, attributes.getValue()); } } - ); + ); } private void mockQueueRegistry() @@ -217,17 +199,14 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testPriorityQueueRegistration() throws Exception { - Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testPriorityQueue"); + attributes.put(Queue.PRIORITIES, 5); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - "testPriorityQueue", - false, - "owner", - false, - false, - false, - attributes); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); @@ -240,10 +219,12 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = getName(); String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, - false, - false, - null); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass()); verifyQueueRegistered(queueName); @@ -261,7 +242,6 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueEnabled() throws Exception { - Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; @@ -270,14 +250,13 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - false, - false, - false, - attributes); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); @@ -314,14 +293,11 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - false, - false, - false, - null); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); Exchange altExchange = queue.getAlternateExchange(); @@ -348,7 +324,8 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueDisabled() throws Exception { - Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false); + Map<String,Object> attributes = new HashMap<String, Object>(); + String queueName = "testDeadLetterQueueDisabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; @@ -357,14 +334,11 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - false, - false, - false, - attributes); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, false); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); @@ -382,7 +356,6 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception { - Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; @@ -391,16 +364,18 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + //create an autodelete queue - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - true, - false, - false, - attributes); - assertTrue("Queue should be autodelete", queue.isAutoDelete()); + AMQQueue queue = _queueFactory.createQueue(null, attributes); + assertEquals("Queue should be autodelete", + LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS, + queue.getLifetimePolicy()); //ensure that the autodelete property overrides the request to enable DLQ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); @@ -417,16 +392,13 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testMaximumDeliveryCount() throws Exception { - Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testMaximumDeliveryCount"); + + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - "testMaximumDeliveryCount", - false, - "owner", - false, - false, - false, - attributes); + final AMQQueue queue = _queueFactory.createQueue(null, attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); @@ -440,14 +412,11 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testMaximumDeliveryCountDefault() throws Exception { - final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - "testMaximumDeliveryCount", - false, - "owner", - false, - false, - false, - null); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault"); + + final AMQQueue queue = _queueFactory.createQueue(null, attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); @@ -462,15 +431,16 @@ public class AMQQueueFactoryTest extends QpidTestCase { try { - _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, - false, - null); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + + _queueFactory.createQueue(null, attributes); fail("queue with null name can not be created!"); } catch (Exception e) { assertTrue(e instanceof IllegalArgumentException); - assertEquals("Queue name must not be null", e.getMessage()); + assertEquals("Value for attribute name is not found", e.getMessage()); } } @@ -486,9 +456,14 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); - Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, false, attributes); + + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + + _queueFactory.createQueue(null, attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); } catch (Exception e) @@ -511,9 +486,14 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); - Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, false, attributes); + + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + + _queueFactory.createQueue(null, attributes); fail("queue with DLE name having more than 255 characters can not be created!"); } catch (Exception e) @@ -528,6 +508,7 @@ public class AMQQueueFactoryTest extends QpidTestCase { Map<String,String> arguments = new HashMap<String, String>(); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"mykey"); arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1"); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java index c2291f5eed..42b83d8c47 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java @@ -26,9 +26,11 @@ import junit.framework.TestCase; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; public class ConflationQueueListTest extends TestCase @@ -46,8 +48,11 @@ public class ConflationQueueListTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class), - Collections.<String,Object>emptyMap(),CONFLATION_KEY); + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY); + _queue = new ConflationQueue(mock(VirtualHost.class), null, queueAttributes); _list = _queue.getEntries(); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index c462468819..91c0136af6 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; @@ -131,6 +132,12 @@ public class MockAMQQueue implements AMQQueue return 0; } + @Override + public LifetimePolicy getLifetimePolicy() + { + return null; + } + public int getBindingCountHigh() { return 0; @@ -216,7 +223,7 @@ public class MockAMQQueue implements AMQQueue } @Override - public void addQueueDeleteTask(final Action task) + public void addDeleteTask(final Action task) { } @@ -345,7 +352,7 @@ public class MockAMQQueue implements AMQQueue } @Override - public void removeQueueDeleteTask(final Action task) + public void removeDeleteTask(final Action task) { } @@ -478,6 +485,12 @@ public class MockAMQQueue implements AMQQueue return false; } + @Override + public boolean verifySessionAccess(final AMQSessionModel session) + { + return false; + } + public Exchange getAlternateExchange() { return null; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index 3db5d0fb62..0ac4eb8f78 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -26,10 +26,12 @@ import static org.mockito.Mockito.when; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; public class PriorityQueueListTest extends QpidTestCase @@ -45,16 +47,11 @@ public class PriorityQueueListTest extends QpidTestCase protected void setUp() { QueueEntry[] entries = new QueueEntry[PRIORITIES.length]; - - PriorityQueue queue = new PriorityQueue(UUID.randomUUID(), - getName(), - false, - null, - false, - false, - mock(VirtualHost.class), - Collections.<String,Object>emptyMap(), - 10); + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.PRIORITIES, 10); + PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), null, queueAttributes); _list = queue.getEntries(); for (int i = 0; i < PRIORITIES.length; i++) diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 66e4286df7..139e32420f 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -27,11 +27,13 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageInstance.EntryState; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.lang.reflect.Field; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.mockito.Mockito.mock; @@ -199,8 +201,10 @@ public abstract class QueueEntryImplTestBase extends TestCase { int numberOfEntries = 5; QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; - StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, - mock(VirtualHost.class), Collections.<String,Object>emptyMap()); + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); OrderedQueueEntryList queueEntryList = queue.getEntries(); // create test entries diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java index 1934349a62..956a9fc424 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java @@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; -import java.util.Arrays; -import java.util.EnumSet; -import java.util.Map; +import java.util.*; import org.apache.log4j.Logger; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.exchange.DirectExchange; @@ -51,10 +51,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QpidTestCase { private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class); @@ -68,7 +64,7 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends private DirectExchange _exchange; private MockConsumer _consumerTarget = new MockConsumer(); private QueueConsumer _consumer; - private Map<String,Object> _arguments = null; + private Map<String,Object> _arguments = Collections.emptyMap(); @Override public void setUp() throws Exception @@ -78,8 +74,12 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, - false, false, false, _arguments); + Map<String,Object> attributes = new HashMap<String, Object>(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, _qname); + attributes.put(Queue.OWNER, _owner); + + _queue = (Q) _virtualHost.createQueue(null, attributes); _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); } @@ -104,9 +104,10 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends _queue.stop(); try { - _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null, - false, _owner, false, - false, false, _arguments); + Map<String,Object> attributes = new HashMap<String, Object>(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + + _queue = (Q) _virtualHost.createQueue(null, attributes); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -115,10 +116,10 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends e.getMessage().contains("name")); } - _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), - "differentName", false, - _owner, false, - false, false, _arguments); + Map<String,Object> attributes = new HashMap<String, Object>(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, "differentName"); + _queue = (Q) _virtualHost.createQueue(null, attributes); assertNotNull("Queue was not created", _queue); } @@ -1137,15 +1138,17 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends { public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost) { - super(UUIDGenerator.generateRandomUUID(), - "testQueue", - false, - "testOwner", - false, - false, - vhost, - factory, - null); + super(vhost, null, attributes(), factory); + } + + private static Map<String,Object> attributes() + { + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "test"); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + return attributes; } @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index c115af5a38..286c47fdf6 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -21,11 +21,15 @@ package org.apache.qpid.server.queue; import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import java.util.HashMap; +import java.util.Map; + public class SimpleAMQQueueThreadPoolTest extends QpidTestCase { @@ -50,8 +54,10 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase try { - SimpleAMQQueue queue = (SimpleAMQQueue) - test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, "test"); + AMQQueue queue = test.createQueue(null, attributes); assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index 9457d59300..7aa546e1b3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.mockito.Mockito.mock; @@ -38,8 +41,10 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase public void setUp() throws Exception { mockLogging(); - - StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null); + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest"); + StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); queueEntryList = queue.getEntries(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index f8953bb7cc..cfa58f11c4 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -26,9 +26,13 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.mockito.Matchers.eq; @@ -74,8 +78,15 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueue { mockLogging(); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(Queue.ID,UUID.randomUUID()); + attributes.put(Queue.NAME, getName()); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + attributes.put(Queue.SORT_KEY, "KEY"); + // Create test list - _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() + _testQueue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() { @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 25d6dbba60..e55875135a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -20,11 +20,15 @@ package org.apache.qpid.server.queue; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import static org.mockito.Matchers.eq; @@ -42,7 +46,15 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase public void setUp() throws Exception { mockLogging(); - SortedQueue queue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() + + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(Queue.ID,UUID.randomUUID()); + attributes.put(Queue.NAME, getName()); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + attributes.put(Queue.SORT_KEY, "KEY"); + + SortedQueue queue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() { @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 2c697cfe2b..fe9273dbc8 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -22,9 +22,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -45,8 +46,11 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ protected void setUp() { oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9"); - _testQueue = new StandardQueue(UUID.randomUUID(),getName(),false,null,false,false,mock(VirtualHost.class), - Collections.<String,Object>emptyMap()); + + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + _testQueue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); _sqel = _testQueue.getEntries(); for(int i = 1; i <= 100; i++) @@ -86,9 +90,10 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ { if(newList) { - StandardQueue queue = - new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class), - Collections.<String, Object>emptyMap()); + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); return queue.getEntries(); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index c7b812effe..77886fa15e 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -25,18 +25,21 @@ import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; -import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; + public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList> { @@ -44,7 +47,12 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry { try { - setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, getOwner(), false,false, null, getArguments())); + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "testActiveConsumerCount"); + queueAttributes.put(Queue.OWNER, "testOwner"); + + setQueue(new StandardQueue(null, null, queueAttributes)); assertNull("Queue was created", getQueue()); } catch (IllegalArgumentException e) @@ -58,7 +66,13 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry public void testAutoDeleteQueue() throws Exception { getQueue().stop(); - setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, null, true, false, getVirtualHost(), Collections.<String,Object>emptyMap())); + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getQname()); + queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes); + + setQueue(queue); getQueue().setDeleteOnNoConsumers(true); ServerMessage message = createMessage(25l); @@ -75,8 +89,12 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry public void testActiveConsumerCount() throws Exception { - final StandardQueue queue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false, - "testOwner", false, false, getVirtualHost(), null); + + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "testActiveConsumerCount"); + queueAttributes.put(Queue.OWNER, "testOwner"); + final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes); //verify adding an active consumer increases the count final MockConsumer consumer1 = new MockConsumer(); @@ -145,8 +163,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry public void testEnqueueDequeuedEntry() throws Exception { // create a queue where each even entry is considered a dequeued - SimpleAMQQueue queue = new DequeuedQueue(UUIDGenerator.generateRandomUUID(), "test", false, - "testOwner", false, false, getVirtualHost(), null); + SimpleAMQQueue queue = new DequeuedQueue(getVirtualHost()); // create a consumer MockConsumer consumer = new MockConsumer(); @@ -180,9 +197,11 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry int messageNumber = 4; int dequeueMessageIndex = 1; + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "test"); // create queue with overridden method deliverAsync - StandardQueue testQueue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "test", - false, "testOwner", false, false, getVirtualHost(), null) + StandardQueue testQueue = new StandardQueue(getVirtualHost(), null, queueAttributes) { @Override public void deliverAsync(QueueConsumer sub) @@ -249,16 +268,19 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry private static class DequeuedQueue extends SimpleAMQQueue<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList> { - public DequeuedQueue(final UUID id, - final String queueName, - final boolean durable, - final String owner, - final boolean autoDelete, - final boolean exclusive, - final VirtualHost virtualHost, - final Map<String, Object> arguments) + public DequeuedQueue(VirtualHost virtualHost) + { + super(virtualHost, null, attributes(), new DequeuedQueueEntryListFactory()); + } + + private static Map<String,Object> attributes() { - super(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new DequeuedQueueEntryListFactory(), arguments); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "test"); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + return attributes; } } private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList> diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 8b66e7d82f..c5c68ecf45 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.times; import java.io.File; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -40,6 +41,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; @@ -143,7 +145,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE), eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", - org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); + org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()))); } private Map<String,Object> map(Object... vals) @@ -220,7 +222,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map<String, Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -240,7 +242,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -258,7 +260,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map<String, Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -292,8 +294,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -320,8 +320,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); @@ -361,13 +359,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { AMQQueue queue = mock(AMQQueue.class); when(queue.getName()).thenReturn(queueName); - when(queue.getOwner()).thenReturn(queueOwner); when(queue.isExclusive()).thenReturn(exclusive); when(queue.getId()).thenReturn(_queueId); when(queue.getAlternateExchange()).thenReturn(alternateExchange); - if(arguments != null && !arguments.isEmpty()) + final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments); + attributes.put(Queue.NAME, queueName); + if(exclusive) { - when(queue.getAvailableAttributes()).thenReturn(arguments.keySet()); + when(queue.getOwner()).thenReturn(queueOwner); + + attributes.put(Queue.OWNER, queueOwner); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER); + } + when(queue.getAvailableAttributes()).thenReturn(attributes.keySet()); final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class); when(queue.getAttribute(requestedAttribute.capture())).then( new Answer() @@ -377,10 +381,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public Object answer(final InvocationOnMock invocation) throws Throwable { String attrName = requestedAttribute.getValue(); - return arguments.get(attrName); + return attributes.get(attrName); } }); - } return queue; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index fb63fefb88..f082d58d39 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -25,11 +25,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.net.SocketAddress; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -182,8 +184,10 @@ public class BrokerTestHelper public static AMQQueue createQueue(String queueName, VirtualHost virtualHost) throws QpidSecurityException, QueueExistsException { - AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null, - false, false, false, Collections.<String, Object>emptyMap()); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, queueName); + AMQQueue queue = virtualHost.createQueue(null, attributes); return queue; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 223e2c5218..1e25aac197 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.test.utils.QpidTestCase; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; @@ -117,14 +118,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase - final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class); - final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class); + final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class); _queueFactory = mock(QueueFactory.class); - when(_queueFactory.restoreQueue(idArg.capture(), queueArg.capture(), - anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then( + when(_queueFactory.restoreQueue(attributesArg.capture())).then( new Answer() { @@ -133,8 +131,9 @@ public class DurableConfigurationRecovererTest extends QpidTestCase { final AMQQueue queue = mock(AMQQueue.class); - final String queueName = queueArg.getValue(); - final UUID queueId = idArg.getValue(); + final Map attributes = attributesArg.getValue(); + final String queueName = (String) attributes.get(Queue.NAME); + final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); when(queue.getName()).thenReturn(queueName); when(queue.getId()).thenReturn(queueId); @@ -153,10 +152,10 @@ public class DurableConfigurationRecovererTest extends QpidTestCase return null; } } - ).when(queue).setAlternateExchange(altExchangeArg.capture()); + ).when(queue).setAlternateExchange(altExchangeArg.capture()); - Map args = argsArg.getValue(); - if(args.containsKey(Queue.ALTERNATE_EXCHANGE)) + Map args = attributes; + if (args.containsKey(Queue.ALTERNATE_EXCHANGE)) { final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); final Exchange exchange = _exchangeRegistry.getExchange(exchangeId); @@ -470,7 +469,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase { queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString()); } - queue.put(Queue.EXCLUSIVE, false); return queue; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index f7eecc73fc..cee7edf1b7 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; @@ -152,14 +153,7 @@ public class MockVirtualHost implements VirtualHost } @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) + public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) { return null; } @@ -314,4 +308,52 @@ public class MockVirtualHost implements VirtualHost public void unblock() { } + + @Override + public long getDefaultAlertThresholdMessageAge() + { + return 0; + } + + @Override + public long getDefaultAlertThresholdMessageSize() + { + return 0; + } + + @Override + public long getDefaultAlertThresholdQueueDepthMessages() + { + return 0; + } + + @Override + public long getDefaultAlertThresholdQueueDepthBytes() + { + return 0; + } + + @Override + public long getDefaultAlertRepeatGap() + { + return 0; + } + + @Override + public long getDefaultQueueFlowControlSizeBytes() + { + return 0; + } + + @Override + public long getDefaultQueueFlowResumeSizeBytes() + { + return 0; + } + + @Override + public int getDefaultMaximumDeliveryAttempts() + { + return 0; + } } |