From add5c695d1138bc25bb89cd0e1b1724bf542f676 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 17 Feb 2014 20:19:36 +0000 Subject: Update Queue implementation to better define lifetime and exclusivity policies git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1569102 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/transport/ConnectionEndpoint.java | 2 +- .../qpid/amqp_1_0/transport/SessionEndpoint.java | 36 +- .../store/berkeleydb/upgrade/UpgradeFrom5To6.java | 2 +- .../berkeleydb/upgrade/UpgradeFrom5To6Test.java | 1 + .../server/configuration/QueueConfiguration.java | 67 +- .../apache/qpid/server/filter/FilterSupport.java | 18 +- .../server/logging/subjects/ChannelLogSubject.java | 2 +- .../logging/subjects/ConnectionLogSubject.java | 6 +- .../apache/qpid/server/message/MessageSource.java | 24 +- .../qpid/server/model/ExclusivityPolicy.java | 31 + .../apache/qpid/server/model/LifetimePolicy.java | 6 +- .../org/apache/qpid/server/model/VirtualHost.java | 7 +- .../qpid/server/model/adapter/BindingAdapter.java | 2 +- .../server/model/adapter/ConnectionAdapter.java | 4 +- .../qpid/server/model/adapter/ConsumerAdapter.java | 2 +- .../qpid/server/model/adapter/ExchangeAdapter.java | 4 +- .../qpid/server/model/adapter/QueueAdapter.java | 45 +- .../server/model/adapter/VirtualHostAdapter.java | 56 +- .../qpid/server/protocol/AMQConnectionModel.java | 14 +- .../qpid/server/protocol/AMQSessionModel.java | 5 +- .../org/apache/qpid/server/queue/AMQQueue.java | 21 +- .../apache/qpid/server/queue/AMQQueueFactory.java | 490 +++++---------- .../apache/qpid/server/queue/ConflationQueue.java | 30 +- .../apache/qpid/server/queue/OutOfOrderQueue.java | 11 +- .../apache/qpid/server/queue/PriorityQueue.java | 28 +- .../qpid/server/queue/QueueArgumentsConverter.java | 2 +- .../org/apache/qpid/server/queue/QueueFactory.java | 18 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 698 +++++++++++++++++---- .../org/apache/qpid/server/queue/SortedQueue.java | 30 +- .../apache/qpid/server/queue/StandardQueue.java | 14 +- .../server/security/access/ObjectProperties.java | 10 +- .../store/DurableConfigurationStoreHelper.java | 13 +- .../org/apache/qpid/server/util/Deletable.java | 27 + .../apache/qpid/server/util/MapValueConverter.java | 45 ++ .../server/virtualhost/AbstractVirtualHost.java | 109 +++- .../virtualhost/DefaultUpgraderProvider.java | 49 +- .../qpid/server/virtualhost/ExchangeRecoverer.java | 2 +- .../qpid/server/virtualhost/QueueRecoverer.java | 12 +- .../qpid/server/virtualhost/VirtualHost.java | 26 +- .../configuration/QueueConfigurationTest.java | 23 - .../apache/qpid/server/consumer/MockConsumer.java | 46 +- .../qpid/server/exchange/TopicExchangeTest.java | 45 +- .../logging/subjects/ConnectionLogSubjectTest.java | 7 +- .../qpid/server/queue/AMQQueueFactoryTest.java | 187 +++--- .../qpid/server/queue/ConflationQueueListTest.java | 11 +- .../org/apache/qpid/server/queue/MockAMQQueue.java | 17 +- .../qpid/server/queue/PriorityQueueListTest.java | 19 +- .../qpid/server/queue/QueueEntryImplTestBase.java | 10 +- .../qpid/server/queue/SimpleAMQQueueTestBase.java | 55 +- .../server/queue/SimpleAMQQueueThreadPoolTest.java | 10 +- .../server/queue/SimpleQueueEntryImplTest.java | 9 +- .../server/queue/SortedQueueEntryListTest.java | 13 +- .../qpid/server/queue/SortedQueueEntryTest.java | 14 +- .../server/queue/StandardQueueEntryListTest.java | 17 +- .../qpid/server/queue/StandardQueueTest.java | 60 +- .../AbstractDurableConfigurationStoreTestCase.java | 29 +- .../apache/qpid/server/util/BrokerTestHelper.java | 10 +- .../DurableConfigurationRecovererTest.java | 20 +- .../qpid/server/virtualhost/MockVirtualHost.java | 58 +- .../server/protocol/v0_10/ServerConnection.java | 57 +- .../protocol/v0_10/ServerConnectionDelegate.java | 11 +- .../qpid/server/protocol/v0_10/ServerSession.java | 17 +- .../protocol/v0_10/ServerSessionDelegate.java | 127 +--- .../qpid/server/protocol/v0_8/AMQChannel.java | 63 +- .../server/protocol/v0_8/AMQProtocolEngine.java | 226 ++----- .../server/protocol/v0_8/AMQProtocolSession.java | 23 +- .../v0_8/handler/BasicConsumeMethodHandler.java | 17 +- .../v0_8/handler/BasicGetMethodHandler.java | 16 +- .../protocol/v0_8/handler/QueueBindHandler.java | 13 - .../protocol/v0_8/handler/QueueDeclareHandler.java | 114 +--- .../protocol/v0_8/handler/QueueDeleteHandler.java | 3 +- .../protocol/v0_8/handler/QueuePurgeHandler.java | 4 +- .../protocol/v0_8/InternalTestProtocolSession.java | 2 +- .../qpid/server/protocol/v1_0/Connection_1_0.java | 102 +-- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 63 +- .../qpid/server/protocol/v1_0/Session_1_0.java | 148 +++-- .../server/management/amqp/ManagementNode.java | 26 +- .../qpid/server/jmx/mbeans/ExchangeMBean.java | 2 +- .../apache/qpid/server/jmx/mbeans/QueueMBean.java | 24 +- .../server/jmx/mbeans/VirtualHostManagerMBean.java | 9 +- .../qpid/server/jmx/mbeans/QueueMBeanTest.java | 22 +- .../jmx/mbeans/VirtualHostManagerMBeanTest.java | 31 +- .../org/apache/qpid/server/queue/ModelTest.java | 2 +- .../apache/qpid/server/store/MessageStoreTest.java | 31 +- .../management/jmx/QueueManagementTest.java | 5 +- .../java/org/apache/qpid/systest/rest/Asserts.java | 14 +- .../apache/qpid/systest/rest/QueueRestTest.java | 4 +- .../qpid/systest/rest/VirtualHostRestTest.java | 4 +- .../test/unit/client/MaxDeliveryCountTest.java | 9 +- java/test-profiles/Java010Excludes | 6 + java/test-profiles/JavaPre010Excludes | 5 + 91 files changed, 2185 insertions(+), 1644 deletions(-) create mode 100644 java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java create mode 100644 java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 270abef88b..1556876681 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -563,7 +563,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { _receivingSessions[channel] = null; - endpoint.end(end); + endpoint.receiveEnd(end); } else { diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index c37c52c6ea..c9212b1a1e 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -153,19 +153,47 @@ public class SessionEndpoint public void end() { - end(null); + end(new End()); } - public void end(final End end) + public void end(End end) { synchronized(getLock()) { switch(_state) { case BEGIN_SENT: - _connection.sendEnd(getSendingChannel(), new End(), false); + _connection.sendEnd(getSendingChannel(), end, false); _state = SessionState.END_PIPE; break; + case ACTIVE: + detachLinks(); + short sendChannel = getSendingChannel(); + _connection.sendEnd(sendChannel, end, true); + _state = SessionState.END_SENT; + break; + default: + sendChannel = getSendingChannel(); + End reply = new End(); + Error error = new Error(); + error.setCondition(AmqpError.ILLEGAL_STATE); + error.setDescription("END called on Session which has not been opened"); + reply.setError(error); + _connection.sendEnd(sendChannel, reply, true); + break; + + + } + getLock().notifyAll(); + } + } + + public void receiveEnd(final End end) + { + synchronized(getLock()) + { + switch(_state) + { case END_SENT: _state = SessionState.ENDED; break; @@ -174,7 +202,7 @@ public class SessionEndpoint _sessionEventListener.remoteEnd(end); short sendChannel = getSendingChannel(); _connection.sendEnd(sendChannel, new End(), true); - _state = end == null ? SessionState.END_SENT : SessionState.ENDED; + _state = SessionState.ENDED; break; default: sendChannel = getSendingChannel(); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 9b1ab7057b..46f2afd741 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -568,7 +568,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade Map attributesMap = new HashMap(); attributesMap.put(Exchange.NAME, exchangeName); attributesMap.put(Exchange.TYPE, exchangeType); - attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? LifetimePolicy.AUTO_DELETE.name() + attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? "AUTO_DELETE" : LifetimePolicy.PERMANENT.name()); String json = _serializer.serialize(attributesMap); UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Exchange.class.getName(), json); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 44f0861275..02494009c1 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -44,6 +44,7 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 25466d9c55..4e9d94c030 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.configuration; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.Map; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; @@ -43,7 +45,6 @@ public class QueueConfiguration extends AbstractConfiguration CompositeConfiguration mungedConf = new CompositeConfiguration(); mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + escapeTagName(name))); - mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues")); setConfiguration("virtualhosts.virtualhost.queues.queue", mungedConf); } @@ -85,19 +86,33 @@ public class QueueConfiguration extends AbstractConfiguration return _vHostConfig; } + private boolean getDefaultedBoolean(String attribute) + { + final Configuration config = _vHostConfig.getConfig(); + if(config.containsKey("queues."+attribute)) + { + final boolean defaultValue = config.getBoolean("queues." + attribute); + return getBooleanValue(attribute, defaultValue); + } + else + { + return getBooleanValue(attribute); + } + } + public boolean getDurable() { - return getBooleanValue("durable"); + return getDefaultedBoolean("boolean"); } public boolean getExclusive() { - return getBooleanValue("exclusive"); + return getDefaultedBoolean("exclusive"); } public boolean getAutoDelete() { - return getBooleanValue("autodelete"); + return getDefaultedBoolean("autodelete"); } public String getOwner() @@ -107,17 +122,41 @@ public class QueueConfiguration extends AbstractConfiguration public boolean getPriority() { - return getBooleanValue("priority"); + return getDefaultedBoolean("priority"); } public int getPriorities() { - return getIntValue("priorities", -1); + final Configuration config = _vHostConfig.getConfig(); + + int defaultValue; + if(config.containsKey("queues.priorities")) + { + defaultValue = config.getInt("queues.priorities"); + } + else + { + defaultValue = -1; + } + return getIntValue("priorities", defaultValue); } public String getExchange() { - return getStringValue("exchange", ExchangeDefaults.DEFAULT_EXCHANGE_NAME); + final Configuration config = _vHostConfig.getConfig(); + + String defaultValue; + + if(config.containsKey("queues.exchange")) + { + defaultValue = config.getString("queues.exchange"); + } + else + { + defaultValue = ""; + } + + return getStringValue("exchange", defaultValue); } public List getRoutingKeys() @@ -137,37 +176,37 @@ public class QueueConfiguration extends AbstractConfiguration public int getMaximumMessageAge() { - return getIntValue("maximumMessageAge", _vHostConfig.getMaximumMessageAge()); + return getIntValue("maximumMessageAge"); } public long getMaximumQueueDepth() { - return getLongValue("maximumQueueDepth", _vHostConfig.getMaximumQueueDepth()); + return getLongValue("maximumQueueDepth"); } public long getMaximumMessageSize() { - return getLongValue("maximumMessageSize", _vHostConfig.getMaximumMessageSize()); + return getLongValue("maximumMessageSize"); } public long getMaximumMessageCount() { - return getLongValue("maximumMessageCount", _vHostConfig.getMaximumMessageCount()); + return getLongValue("maximumMessageCount"); } public long getMinimumAlertRepeatGap() { - return getLongValue("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap()); + return getLongValue("minimumAlertRepeatGap"); } public long getCapacity() { - return getLongValue("capacity", _vHostConfig.getCapacity()); + return getLongValue("capacity"); } public long getFlowResumeCapacity() { - return getLongValue("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity()); + return getLongValue("flowResumeCapacity"); } public boolean isLVQ() diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 9870551313..ea0faf132e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import java.lang.ref.WeakReference; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; @@ -29,6 +30,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -120,19 +122,25 @@ public class FilterSupport public static final class NoLocalFilter implements MessageFilter { - private final MessageSource _queue; + private final MessageSource _queue; - public NoLocalFilter(MessageSource queue) + private NoLocalFilter(MessageSource queue) { _queue = queue; } public boolean matches(Filterable message) { - final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || - exclusiveOwningSession.getConnectionReference() != message.getConnectionReference(); + final Collection consumers = _queue.getConsumers(); + for(Consumer c : consumers) + { + if(c.getSessionModel().getConnectionReference() == message.getConnectionReference()) + { + return false; + } + } + return !consumers.isEmpty(); } @Override diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index 5b0e34b73e..dcbb693cf1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -46,7 +46,7 @@ public class ChannelLogSubject extends AbstractLogSubject AMQConnectionModel connection = session.getConnectionModel(); setLogStringWithFormat(CHANNEL_FORMAT, connection == null ? -1L : connection.getConnectionId(), - (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(), + (connection == null || connection.getAuthorizedPrincipal() == null) ? "?" : connection.getAuthorizedPrincipal().getName(), (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(), (connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(), session.getChannelId()); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java index 87c2377e0f..b1b8bc50ed 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java @@ -55,7 +55,7 @@ public class ConnectionLogSubject extends AbstractLogSubject { if (!_upToDate) { - if (_session.getPrincipalAsString() != null) + if (_session.getAuthorizedPrincipal() != null) { if (_session.getVirtualHostName() != null) { @@ -71,7 +71,7 @@ public class ConnectionLogSubject extends AbstractLogSubject */ setLogString("[" + MessageFormat.format(CONNECTION_FORMAT, _session.getConnectionId(), - _session.getPrincipalAsString(), + _session.getAuthorizedPrincipal().getName(), _session.getRemoteAddressString(), _session.getVirtualHostName()) + "] "); @@ -82,7 +82,7 @@ public class ConnectionLogSubject extends AbstractLogSubject { setLogString("[" + MessageFormat.format(USER_FORMAT, _session.getConnectionId(), - _session.getPrincipalAsString(), + _session.getAuthorizedPrincipal().getName(), _session.getRemoteAddressString()) + "] "); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index b947bc0ef6..33c8e26790 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -36,7 +36,8 @@ public interface MessageSource> C addConsumer(T target, FilterManager filters, Class messageClass, String consumerName, EnumSet options) - throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException; + throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException, + ConsumerAccessRefused; Collection getConsumers(); @@ -44,16 +45,10 @@ public interface MessageSource> void removeConsumerRegistrationListener(ConsumerRegistrationListener listener); - AuthorizationHolder getAuthorizationHolder(); - - void setAuthorizationHolder(AuthorizationHolder principalHolder); - - void setExclusiveOwningSession(AMQSessionModel owner); - - AMQSessionModel getExclusiveOwningSession(); - boolean isExclusive(); + boolean verifySessionAccess(AMQSessionModel session); + interface ConsumerRegistrationListener> { void consumerAdded(Q source, Consumer consumer); @@ -76,7 +71,6 @@ public interface MessageSource> public ExistingExclusiveConsumer() { - super(""); } } @@ -95,7 +89,15 @@ public interface MessageSource> { public ExistingConsumerPreventsExclusive() { - super(""); } } + + static final class ConsumerAccessRefused extends Exception + { + public ConsumerAccessRefused() + { + } + } + + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java new file mode 100644 index 0000000000..a559a69ab5 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public enum ExclusivityPolicy +{ + NONE, + SESSION, + CONNECTION, + CONTAINER, + PRINCIPAL, + LINK +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java index c9006f4e71..1e45f8f493 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java @@ -23,5 +23,9 @@ package org.apache.qpid.server.model; public enum LifetimePolicy { PERMANENT, - AUTO_DELETE + DELETE_ON_CONNECTION_CLOSE, + DELETE_ON_SESSION_END, + DELETE_ON_NO_OUTBOUND_LINKS, + DELETE_ON_NO_LINKS, + IN_USE } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index f0241f8b30..dd3e9e4b61 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -160,7 +160,7 @@ public interface VirtualHost extends ConfiguredObject QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, CONFIG_PATH)); - int CURRENT_CONFIG_VERSION = 3; + int CURRENT_CONFIG_VERSION = 4; //children Collection getAliases(); @@ -172,9 +172,8 @@ public interface VirtualHost extends ConfiguredObject LifetimePolicy lifetime, long ttl, String type, Map attributes) throws AccessControlException, IllegalArgumentException; - Queue createQueue(String name, State initialState, boolean durable, - boolean exclusive, LifetimePolicy lifetime, long ttl, Map attributes) - throws AccessControlException, IllegalArgumentException; + Queue createQueue(Map attributes) + throws AccessControlException, IllegalArgumentException; Collection getExchangeTypes(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java index 88475809c9..265d4318f1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java @@ -170,7 +170,7 @@ final class BindingAdapter extends AbstractAdapter implements Binding } else if(LIFETIME_POLICY.equals(name)) { - return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE || _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _queue.getLifetimePolicy() != LifetimePolicy.PERMANENT || _exchange.getLifetimePolicy() != LifetimePolicy.PERMANENT ? LifetimePolicy.IN_USE : LifetimePolicy.PERMANENT; } else if(TIME_TO_LIVE.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index dda23f1cfc..3cb6493338 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.model.adapter; import java.security.AccessControlException; +import java.security.Principal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -191,7 +192,8 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection } else if(name.equals(PRINCIPAL)) { - return _connection.getPrincipalAsString(); + final Principal authorizedPrincipal = _connection.getAuthorizedPrincipal(); + return authorizedPrincipal == null ? null : authorizedPrincipal.getName(); } else if(name.equals(PROPERTIES)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index cf6874030b..7935077a40 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -130,7 +130,7 @@ public class ConsumerAdapter extends AbstractAdapter implements org.apache.qpid. } else if(LIFETIME_POLICY.equals(name)) { - return LifetimePolicy.AUTO_DELETE; + return LifetimePolicy.DELETE_ON_SESSION_END; } else if(TIME_TO_LIVE.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index f7ecbd323a..d7b6b8bb75 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -201,7 +201,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa public LifetimePolicy getLifetimePolicy() { - return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; } public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired) @@ -330,7 +330,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa } else if(LIFETIME_POLICY.equals(name)) { - return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; } else if(TIME_TO_LIVE.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 733b6cecc2..5d09cfa8e2 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -31,15 +31,7 @@ import java.util.Map; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.ConfiguredObjectFinder; -import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.IllegalStateTransitionException; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.QueueNotificationListener; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.*; @@ -62,7 +54,6 @@ final class QueueAdapter> extends AbstractAdapter impl put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class); put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class); put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); - put(EXCLUSIVE, Boolean.class); put(DESCRIPTION, String.class); }}); @@ -208,7 +199,7 @@ final class QueueAdapter> extends AbstractAdapter impl public LifetimePolicy getLifetimePolicy() { - return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _queue.getLifetimePolicy(); } public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired) @@ -274,9 +265,33 @@ final class QueueAdapter> extends AbstractAdapter impl } else if(EXCLUSIVE.equals(name)) { - Boolean exclusiveFlag = (Boolean) desired; - _queue.setExclusive(exclusiveFlag); + ExclusivityPolicy desiredPolicy; + if(desired == null) + { + desiredPolicy = ExclusivityPolicy.NONE; + } + else if(desired instanceof ExclusivityPolicy) + { + desiredPolicy = (ExclusivityPolicy)desired; + } + else if (desired instanceof String) + { + desiredPolicy = ExclusivityPolicy.valueOf((String)desired); + } + else + { + throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName()); + } + try + { + _queue.setExclusivityPolicy(desiredPolicy); + } + catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive) + { + throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this"); + } return true; + } else if(MESSAGE_GROUP_KEY.equals(name)) { @@ -376,7 +391,7 @@ final class QueueAdapter> extends AbstractAdapter impl } else if(EXCLUSIVE.equals(name)) { - return _queue.isExclusive(); + return _queue.getAttribute(Queue.EXCLUSIVE); } else if(MESSAGE_GROUP_KEY.equals(name)) { @@ -458,7 +473,7 @@ final class QueueAdapter> extends AbstractAdapter impl } else if(LIFETIME_POLICY.equals(name)) { - return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT; + return _queue.getLifetimePolicy(); } else if(NAME.equals(name)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 17ce69da4a..4cd7432f75 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.model.adapter; import java.io.File; import java.lang.reflect.Type; import java.security.AccessControlException; -import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -58,18 +57,16 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueType; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.ConflationQueue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.LocalTransaction; @@ -356,7 +353,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual name, type, durable, - lifetime == LifetimePolicy.AUTO_DELETE, + lifetime != null && lifetime != LifetimePolicy.PERMANENT, alternateExchange); synchronized (_exchangeAdapters) { @@ -389,7 +386,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual public Queue createQueue(Map attributes) throws AccessControlException, IllegalArgumentException { - attributes = new HashMap(attributes); + checkVHostStateIsActive(); if (attributes.containsKey(Queue.QUEUE_TYPE)) { @@ -405,7 +402,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null) { - attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); + attributes.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); } else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null) { @@ -417,51 +414,12 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } - String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null); - State state = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE); - boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); - LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT); - long ttl = MapValueConverter.getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l); - boolean exclusive= MapValueConverter.getBooleanAttribute(Queue.EXCLUSIVE, attributes, false); - - attributes.remove(Queue.NAME); - attributes.remove(Queue.STATE); - attributes.remove(Queue.DURABLE); - attributes.remove(Queue.LIFETIME_POLICY); - attributes.remove(Queue.TIME_TO_LIVE); - - return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes); - } - - public Queue createQueue(final String name, - final State initialState, - final boolean durable, - boolean exclusive, - final LifetimePolicy lifetime, - final long ttl, - final Map attributes) - throws AccessControlException, IllegalArgumentException - { - checkVHostStateIsActive(); - - String owner = null; - if(exclusive) - { - Principal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(SecurityManager.getThreadSubject()); - if(authenticatedPrincipal != null) - { - owner = authenticatedPrincipal.getName(); - } - } - final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE; try { - AMQQueue queue = - _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, - durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes); + AMQQueue queue = _virtualHost.createQueue(null, attributes); synchronized (_queueAdapters) { @@ -471,15 +429,15 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } catch(QueueExistsException qe) { - throw new IllegalArgumentException("Queue with name "+name+" already exists"); + throw new IllegalArgumentException("Queue with name "+MapValueConverter.getStringAttribute(Queue.NAME,attributes)+" already exists"); } catch (QpidSecurityException e) { throw new AccessControlException(e.toString()); } - } + public String getName() { return (String)getAttribute(NAME); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index bc272f13dc..623cf62472 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -25,11 +25,13 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.util.Deletable; +import java.security.Principal; import java.util.List; import java.util.UUID; -public interface AMQConnectionModel extends StatisticsGatherer +public interface AMQConnectionModel, S extends AMQSessionModel> extends StatisticsGatherer, Deletable { /** * Close the underlying Connection @@ -50,7 +52,7 @@ public interface AMQConnectionModel extends StatisticsGatherer * @param cause * @param message */ - public void closeSession(AMQSessionModel session, AMQConstant cause, String message); + public void closeSession(S session, AMQConstant cause, String message); public long getConnectionId(); @@ -59,15 +61,13 @@ public interface AMQConnectionModel extends StatisticsGatherer * * @return a list of {@link AMQSessionModel}s */ - public List getSessionModels(); + public List getSessionModels(); /** * Return a {@link LogSubject} for the connection. */ public LogSubject getLogSubject(); - public String getUserName(); - public boolean isSessionNameUnique(byte[] name); String getRemoteAddressString(); @@ -78,7 +78,7 @@ public interface AMQConnectionModel extends StatisticsGatherer String getClientProduct(); - String getPrincipalAsString(); + Principal getAuthorizedPrincipal(); long getSessionCountLimit(); @@ -93,4 +93,6 @@ public interface AMQConnectionModel extends StatisticsGatherer boolean isStopped(); String getVirtualHostName(); + + String getRemoteContainerName(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 520489ab1f..db92f6950d 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -26,17 +26,18 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.util.Deletable; /** * Session model interface. * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet} * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}. */ -public interface AMQSessionModel extends Comparable +public interface AMQSessionModel, C extends AMQConnectionModel> extends Comparable, Deletable { public UUID getId(); - public AMQConnectionModel getConnectionModel(); + public C getConnectionModel(); public String getClientID(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 237f2cd8aa..aceebc722b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -27,10 +27,13 @@ import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -38,9 +41,12 @@ import java.util.List; import java.util.Set; public interface AMQQueue, Q extends AMQQueue, C extends Consumer> - extends Comparable, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker, MessageDestination + extends Comparable, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker, MessageDestination, + Deletable { + void setExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive; + public interface NotificationListener { void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg); @@ -66,9 +72,7 @@ public interface AMQQueue, Q extends AMQQueue long getTotalEnqueueCount(); - void setNoLocal(boolean b); - - boolean isAutoDelete(); + LifetimePolicy getLifetimePolicy(); String getOwner(); @@ -104,11 +108,6 @@ public interface AMQQueue, Q extends AMQQueue boolean resend(final E entry, final C consumer); - void addQueueDeleteTask(Action task); - void removeQueueDeleteTask(Action task); - - - List getMessagesOnTheQueue(); List getMessagesOnTheQueue(int num); @@ -189,10 +188,6 @@ public interface AMQQueue, Q extends AMQQueue Collection getAvailableAttributes(); Object getAttribute(String attrName); - void configure(QueueConfiguration config); - - void setExclusive(boolean exclusive); - /** * Gets the maximum delivery count. If a message on this queue * is delivered more than maximumDeliveryCount, the message will be diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 409c528a4b..5003db1385 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,12 +20,14 @@ */ package org.apache.qpid.server.queue; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; @@ -36,6 +38,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; @@ -45,7 +48,6 @@ import org.apache.qpid.server.virtualhost.QueueExistsException; public class AMQQueueFactory implements QueueFactory { - public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key"; public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; @@ -59,381 +61,207 @@ public class AMQQueueFactory implements QueueFactory { _virtualHost = virtualHost; _queueRegistry = queueRegistry; + } + + @Override + public AMQQueue restoreQueue(Map attributes) throws QpidSecurityException + { + return createOrRestoreQueue(null, attributes, false); + } - private abstract static class QueueProperty + @Override + public AMQQueue createQueue(final AMQSessionModel creatingSession, + Map attributes) throws QpidSecurityException + { + return createOrRestoreQueue(creatingSession, attributes, true); + } + + private AMQQueue createOrRestoreQueue(final AMQSessionModel creatingSession, Map attributes, + boolean createInStore) throws QpidSecurityException { - private final String _argumentName; + String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); - public QueueProperty(String argumentName) + QueueConfiguration config = _virtualHost.getConfiguration().getQueueConfiguration(queueName); + + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE) && config.getMaximumMessageAge() != 0) { - _argumentName = argumentName; + attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, config.getMaximumMessageAge()); } - - public String getArgumentName() + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) && config.getMaximumQueueDepth() != 0) { - return _argumentName; + attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, config.getMaximumQueueDepth()); } - - - public abstract void setPropertyValue(AMQQueue queue, Object value); - - } - - private abstract static class QueueLongProperty extends QueueProperty - { - - public QueueLongProperty(String argumentName) + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) && config.getMaximumMessageSize() != 0) { - super(argumentName); + attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, config.getMaximumMessageSize()); } - - public void setPropertyValue(AMQQueue queue, Object value) + if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) && config.getMaximumMessageCount() != 0) { - if(value instanceof Number) - { - setPropertyValue(queue, ((Number)value).longValue()); - } - + attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, config.getMaximumMessageCount()); } - - abstract void setPropertyValue(AMQQueue queue, long value); - - - } - - private abstract static class QueueIntegerProperty extends QueueProperty - { - public QueueIntegerProperty(String argumentName) + if (!attributes.containsKey(Queue.ALERT_REPEAT_GAP) && config.getMinimumAlertRepeatGap() != 0) { - super(argumentName); + attributes.put(Queue.ALERT_REPEAT_GAP, config.getMinimumAlertRepeatGap()); } - - public void setPropertyValue(AMQQueue queue, Object value) + if (config.getMaxDeliveryCount() != 0 && !attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) { - if(value instanceof Number) - { - setPropertyValue(queue, ((Number)value).intValue()); - } - + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, config.getMaxDeliveryCount()); } - abstract void setPropertyValue(AMQQueue queue, int value); - } - - private static final QueueProperty[] DECLARABLE_PROPERTIES = { - new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageAge(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageSize(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumMessageCount(value); - } - }, - new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMaximumQueueDepth(value); - } - }, - new QueueLongProperty(Queue.ALERT_REPEAT_GAP) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setMinimumAlertRepeatGap(value); - } - }, - new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setCapacity(value); - } - }, - new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) - { - public void setPropertyValue(AMQQueue queue, long value) - { - queue.setFlowResumeCapacity(value); - } - }, - new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS) - { - public void setPropertyValue(AMQQueue queue, int value) - { - queue.setMaximumDeliveryCount(value); - } - } - }; - - @Override - public AMQQueue restoreQueue(UUID id, - String queueName, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map arguments) throws QpidSecurityException - { - return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false); - - } - - /** - * @param id the id to use. - * @param deleteOnNoConsumer - */ - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map arguments) throws QpidSecurityException - { - return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true); - } - - private AMQQueue createOrRestoreQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map arguments, - boolean createInStore) throws QpidSecurityException - { - if (id == null) + if (!attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) && config.getCapacity() != 0) { - throw new IllegalArgumentException("Queue id must not be null"); + attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, config.getCapacity()); } - if (queueName == null) + if (!attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) && config.getFlowResumeCapacity() != 0) { - throw new IllegalArgumentException("Queue name must not be null"); + attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, config.getFlowResumeCapacity()); } - QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName); - - boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration); + boolean createDLQ = createDLQ(attributes, config); if (createDLQ) { validateDLNames(queueName); } - int priorities = 1; - String conflationKey = null; - String sortingKey = null; - - if(arguments != null) - { - if(arguments.containsKey(Queue.LVQ_KEY)) - { - conflationKey = (String) arguments.get(Queue.LVQ_KEY); - if(conflationKey == null) - { - conflationKey = QPID_DEFAULT_LVQ_KEY; - } - } - else if(arguments.containsKey(Queue.PRIORITIES)) - { - Object prioritiesObj = arguments.get(Queue.PRIORITIES); - if(prioritiesObj instanceof Number) - { - priorities = ((Number)prioritiesObj).intValue(); - } - else if(prioritiesObj instanceof String) - { - try - { - priorities = Integer.parseInt(prioritiesObj.toString()); - } - catch (NumberFormatException e) - { - // TODO - should warn here of invalid format - } - } - else - { - // TODO - should warn here of invalid format - } - } - else if(arguments.containsKey(Queue.SORT_KEY)) - { - sortingKey = (String)arguments.get(Queue.SORT_KEY); - } - } + AMQQueue queue; - AMQQueue q; - if(sortingKey != null) + if(attributes.containsKey(Queue.SORT_KEY)) { - q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey); + queue = new SortedQueue(_virtualHost, creatingSession, attributes); } - else if(conflationKey != null) + else if(attributes.containsKey(Queue.LVQ_KEY)) { - q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey); + queue = new ConflationQueue(_virtualHost, creatingSession, attributes); } - else if(priorities > 1) + else if(attributes.containsKey(Queue.PRIORITIES)) { - q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); + queue = new PriorityQueue(_virtualHost, creatingSession, attributes); } else { - q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); + queue = new StandardQueue(_virtualHost, creatingSession, attributes); } - q.setDeleteOnNoConsumers(deleteOnNoConsumer); - //Register the new queue - _queueRegistry.registerQueue(q); + _queueRegistry.registerQueue(queue); - q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName)); - - if(arguments != null) + if(createDLQ) { - for(QueueProperty p : DECLARABLE_PROPERTIES) - { - if(arguments.containsKey(p.getArgumentName())) - { - p.setPropertyValue(q, arguments.get(p.getArgumentName())); - } - } - - if(arguments.get(Queue.NO_LOCAL) instanceof Boolean) - { - q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL)); - } - + createDLQ(queue); } - - if(createDLQ) + else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) { - final String dlExchangeName = getDeadLetterExchangeName(queueName); - final String dlQueueName = getDeadLetterQueueName(queueName); - - Exchange dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); + final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); + Exchange altExchange; try { - dlExchange = _virtualHost.createExchange(dlExchangeId, - dlExchangeName, - ExchangeDefaults.FANOUT_EXCHANGE_CLASS, - true, false, null); - } - catch(ExchangeExistsException e) - { - // We're ok if the exchange already exists - dlExchange = e.getExistingExchange(); - } - catch (ReservedExchangeNameException e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - catch (AMQUnknownExchangeType e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); } - catch (UnknownExchangeException e) + catch(IllegalArgumentException e) { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + altExchange = _virtualHost.getExchange(altExchangeAttr); } + queue.setAlternateExchange(altExchange); + } - AMQQueue dlQueue = null; + if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue); + } - synchronized(_queueRegistry) - { - dlQueue = _queueRegistry.getQueue(dlQueueName); + return queue; + } - if(dlQueue == null) - { - //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc - final Map args = new HashMap(); - args.put(Queue.CREATE_DLQ_ON_CREATION, false); - args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); - - try - { - dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive, - false, args); - } - catch (QueueExistsException e) - { - throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + - "queue already exists, however this occurred within " + - "a block where the queue existence had previously been " + - "checked, and no queue creation should have been " + - "possible from another thread", e); - } - } - } + private void createDLQ(final AMQQueue queue) throws QpidSecurityException + { + final String queueName = queue.getName(); + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); - //ensure the queue is bound to the exchange - if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) - { - //actual routing key used does not matter due to use of fanout exchange, - //but we will make the key 'dlq' as it can be logged at creation. - dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); - } - q.setAlternateExchange(dlExchange); + Exchange dlExchange = null; + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); + + try + { + dlExchange = _virtualHost.createExchange(dlExchangeId, + dlExchangeName, + ExchangeDefaults.FANOUT_EXCHANGE_CLASS, + true, false, null); + } + catch(ExchangeExistsException e) + { + // We're ok if the exchange already exists + dlExchange = e.getExistingExchange(); + } + catch (ReservedExchangeNameException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); } - else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + catch (AMQUnknownExchangeType e) { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + catch (UnknownExchangeException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } - final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE); - Exchange altExchange; - try - { - altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); - } - catch(IllegalArgumentException e) + AMQQueue dlQueue = null; + + synchronized(_queueRegistry) + { + dlQueue = _queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) { - altExchange = _virtualHost.getExchange(altExchangeAttr); + //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc + final Map args = new HashMap(); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); + + try + { + + + args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName())); + args.put(Queue.NAME, dlQueueName); + args.put(Queue.DURABLE, true); + dlQueue = _virtualHost.createQueue(null, args); + } + catch (QueueExistsException e) + { + throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + + "queue already exists, however this occurred within " + + "a block where the queue existence had previously been " + + "checked, and no queue creation should have been " + + "possible from another thread", e); + } } - q.setAlternateExchange(altExchange); } - if (createInStore && q.isDurable() && !q.isAutoDelete()) + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) { - DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q); + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); } - - return q; + queue.setAlternateExchange(dlExchange); } public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws QpidSecurityException { - String queueName = config.getName(); - - boolean durable = config.getDurable(); - boolean autodelete = config.getAutoDelete(); - boolean exclusive = config.getExclusive(); - String owner = config.getOwner(); - Map arguments = createQueueArgumentsFromConfig(config); - - // we need queues that are defined in config to have deterministic ids. - UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName()); - AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments); - q.configure(config); + Map arguments = createQueueAttributesFromConfig(_virtualHost, config); + + AMQQueue q = createOrRestoreQueue(null, arguments, false); return q; } @@ -471,16 +299,19 @@ public class AMQQueueFactory implements QueueFactory /** * Checks if DLQ is enabled for the queue. * - * @param autoDelete - * queue auto-delete flag * @param arguments * queue arguments * @param qConfig * queue configuration * @return true if DLQ enabled */ - protected static boolean createDLQ(boolean autoDelete, Map arguments, QueueConfiguration qConfig) + protected static boolean createDLQ(Map arguments, QueueConfiguration qConfig) { + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + arguments, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + //feature is not to be enabled for temporary queues or when explicitly disabled by argument if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) { @@ -525,46 +356,59 @@ public class AMQQueueFactory implements QueueFactory return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); } - private static Map createQueueArgumentsFromConfig(QueueConfiguration config) + private static Map createQueueAttributesFromConfig(final VirtualHost virtualHost, + QueueConfiguration config) { - Map arguments = new HashMap(); + Map attributes = new HashMap(); if(config.getArguments() != null && !config.getArguments().isEmpty()) { - arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap(config.getArguments()))); + attributes.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap(config.getArguments()))); } if(config.isLVQ() || config.getLVQKey() != null) { - arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey()); + attributes.put(Queue.LVQ_KEY, + config.getLVQKey() == null ? ConflationQueue.DEFAULT_LVQ_KEY : config.getLVQKey()); } else if (config.getPriority() || config.getPriorities() > 0) { - arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + attributes.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); } else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) { - arguments.put(Queue.SORT_KEY, config.getQueueSortKey()); + attributes.put(Queue.SORT_KEY, config.getQueueSortKey()); } if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) { - arguments.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); } if (config.getDescription() != null && !"".equals(config.getDescription())) { - arguments.put(Queue.DESCRIPTION, config.getDescription()); + attributes.put(Queue.DESCRIPTION, config.getDescription()); } - if (arguments.isEmpty()) + attributes.put(Queue.DURABLE, config.getDurable()); + attributes.put(Queue.LIFETIME_POLICY, + config.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT); + if(config.getExclusive()) { - return Collections.emptyMap(); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER); } - else + if(config.getOwner() != null) { - return arguments; + attributes.put(Queue.OWNER, config.getOwner()); } + + attributes.put(Queue.NAME, config.getName()); + + // we need queues that are defined in config to have deterministic ids. + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(config.getName(), virtualHost.getName())); + + + return attributes; } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index a1ff51959c..f350368634 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -22,22 +22,32 @@ package org.apache.qpid.server.queue; import java.util.Map; -import java.util.UUID; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; public class ConflationQueue extends SimpleAMQQueue { - protected ConflationQueue(UUID id, - String name, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - Map args, String conflationKey) + public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + + + protected ConflationQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, Map attributes) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); + super(virtualHost, creatingSession, attributes, entryList(attributes)); + } + + private static ConflationQueueList.Factory entryList(final Map attributes) + { + + String conflationKey = MapValueConverter.getStringAttribute(Queue.LVQ_KEY, + attributes, + DEFAULT_LVQ_KEY); + + // conflation key can still be null if it was present in the map with a null value + return new ConflationQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey); } public String getConflationKey() diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 7cf245c8f8..c9ac3f3621 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -20,19 +20,20 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public abstract class OutOfOrderQueue, Q extends OutOfOrderQueue, L extends SimpleQueueEntryList> extends SimpleAMQQueue { - protected OutOfOrderQueue(UUID id, String name, boolean durable, - String owner, boolean autoDelete, boolean exclusive, - VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map arguments) + protected OutOfOrderQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, + Map attributes, + QueueEntryListFactory entryListFactory) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); + super(virtualHost, creatingSession, attributes, entryListFactory); } @Override diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java index 4440d045d1..1e41a0fb3e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java @@ -20,23 +20,31 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public class PriorityQueue extends OutOfOrderQueue { - protected PriorityQueue(UUID id, - final String name, - final boolean durable, - final String owner, - final boolean autoDelete, - boolean exclusive, - final VirtualHost virtualHost, - Map arguments, int priorities) + + public static final int DEFAULT_PRIORITY_LEVELS = 10; + + protected PriorityQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, + Map attributes) + { + super(virtualHost, creatingSession, attributes, entryList(attributes)); + } + + private static PriorityQueueList.Factory entryList(final Map attributes) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); + final Integer priorities = MapValueConverter.getIntegerAttribute(Queue.PRIORITIES, attributes, + DEFAULT_PRIORITY_LEVELS); + + return new PriorityQueueList.Factory(priorities); } public int getPriorities() diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 589f385d22..49123f8412 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -109,7 +109,7 @@ public class QueueArgumentsConverter } if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) { - modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); + modelArguments.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); } if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java index 8c0386c7b4..62a2d93b0f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -22,25 +22,15 @@ package org.apache.qpid.server.queue; import java.util.Map; import java.util.UUID; + +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.QpidSecurityException; public interface QueueFactory { - AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, + AMQQueue createQueue(final AMQSessionModel creatingSession, Map arguments) throws QpidSecurityException; - AMQQueue restoreQueue(UUID id, - String queueName, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map arguments) throws QpidSecurityException; + AMQQueue restoreQueue(Map arguments) throws QpidSecurityException; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b2408f6dfa..aa7025e068 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.server.queue; +import java.security.Principal; import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; @@ -28,11 +29,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; @@ -50,12 +54,16 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.Deletable; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -78,19 +86,10 @@ abstract class SimpleAMQQueue, Q extends SimpleA private final String _name; /** null means shared */ - private final String _owner; - - private AuthorizationHolder _authorizationHolder; - - private boolean _exclusive = false; - private AMQSessionModel _exclusiveOwner; - + private String _description; private final boolean _durable; - /** If true, this queue is deleted when the last subscriber is removed */ - private final boolean _autoDelete; - private Exchange _alternateExchange; @@ -142,6 +141,10 @@ abstract class SimpleAMQQueue, Q extends SimpleA private long _flowResumeCapacity; + private ExclusivityPolicy _exclusivityPolicy; + private LifetimePolicy _lifetimePolicy; + private Object _exclusiveOwner; // could be connection, session or Principal + private final Set _notificationChecks = EnumSet.noneOf(NotificationCheck.class); @@ -157,7 +160,8 @@ abstract class SimpleAMQQueue, Q extends SimpleA private final Set _blockedChannels = new ConcurrentSkipListSet(); private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List> _deleteTaskList = new CopyOnWriteArrayList>(); + private final List> _deleteTaskList = + new CopyOnWriteArrayList>(); private LogSubject _logSubject; @@ -184,16 +188,98 @@ abstract class SimpleAMQQueue, Q extends SimpleA private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - - protected SimpleAMQQueue(UUID id, - String name, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, Map arguments) + protected SimpleAMQQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, + Map attributes, + QueueEntryListFactory entryListFactory) { + UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); + String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false); + + + _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class, + Queue.EXCLUSIVE, + attributes, + ExclusivityPolicy.NONE); + _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT); + if(creatingSession != null) + { + + switch(_exclusivityPolicy) + { + + case PRINCIPAL: + _exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal(); + break; + case CONTAINER: + _exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName(); + break; + case CONNECTION: + _exclusiveOwner = creatingSession.getConnectionModel(); + addExclusivityConstraint(creatingSession.getConnectionModel()); + break; + case SESSION: + _exclusiveOwner = creatingSession; + addExclusivityConstraint(creatingSession); + break; + case NONE: + case LINK: + // nothing to do as if link no link associated until there is a consumer associated + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy: " + + _exclusivityPolicy + + " this is a coding error inside Qpid"); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = new AuthenticatedPrincipal(owner); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = owner; + } + } + + + if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE) + { + if(creatingSession != null) + { + addLifetimeConstraint(creatingSession.getConnectionModel()); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } + else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END) + { + if(creatingSession != null) + { + addLifetimeConstraint(creatingSession); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } if (name == null) { @@ -207,12 +293,18 @@ abstract class SimpleAMQQueue, Q extends SimpleA _name = name; _durable = durable; - _owner = owner; - _autoDelete = autoDelete; - _exclusive = exclusive; _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList((Q)this); - _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap() : new LinkedHashMap(arguments)); + _entries = entryListFactory.createQueueEntryList((Q) this); + final LinkedHashMap arguments = new LinkedHashMap(attributes); + + arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); + arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); + + _arguments = Collections.synchronizedMap(arguments); + _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null); + + _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); + _id = id; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); @@ -220,30 +312,113 @@ abstract class SimpleAMQQueue, Q extends SimpleA _logSubject = new QueueLogSubject(this); _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); + + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE)) + { + setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes)); + } + else + { + setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)) + { + setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes)); + } + else + { + setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)) + { + setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + attributes)); + } + else + { + setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)) + { + setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + attributes)); + } + else + { + setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes()); + } + if (attributes.containsKey(Queue.ALERT_REPEAT_GAP)) + { + setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes)); + } + else + { + setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)) + { + setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes)); + } + else + { + setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)) + { + setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes)); + } + else + { + setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes()); + } + if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) + { + setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes)); + } + else + { + setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts()); + } + + final String ownerString; + switch(_exclusivityPolicy) + { + case PRINCIPAL: + ownerString = ((Principal) _exclusiveOwner).getName(); + break; + case CONTAINER: + ownerString = (String) _exclusiveOwner; + break; + default: + ownerString = null; + + } + // Log the creation of this Queue. // The priorities display is toggled on if we set priorities > 0 CurrentActor.get().message(_logSubject, - QueueMessages.CREATED(String.valueOf(_owner), + QueueMessages.CREATED(ownerString, _entries.getPriorities(), - _owner != null, - autoDelete, - durable, !durable, + ownerString != null , + _lifetimePolicy != LifetimePolicy.PERMANENT, + durable, + !durable, _entries.getPriorities() > 0)); - if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY)) + if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) { - if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null - && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null + && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) { - Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); + Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), + new DefinedGroupMessageGroupManager(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)), defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else { - _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(attributes.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -256,6 +431,38 @@ abstract class SimpleAMQQueue, Q extends SimpleA } + private void addLifetimeConstraint(final Deletable lifetimeObject) + { + final Action deleteQueueTask = new Action() + { + @Override + public void performAction(final Deletable object) + { + try + { + getVirtualHost().removeQueue(SimpleAMQQueue.this); + } + catch (QpidSecurityException e) + { + throw new ConnectionScopedRuntimeException("Unable to delete a queue even though the queue's " + + "lifetime was tied to an object being deleted"); + } + } + }; + + lifetimeObject.addDeleteTask(deleteQueueTask); + addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask)); + } + + private void addExclusivityConstraint(final Deletable lifetimeObject) + { + final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject); + final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction); + clearOwnerAction.setDeleteTask(deleteDeleteTask); + lifetimeObject.addDeleteTask(clearOwnerAction); + addDeleteTask(deleteDeleteTask); + } + public void resetNotifications() { // This ensure that the notification checks for the configured alerts are created. @@ -303,12 +510,7 @@ abstract class SimpleAMQQueue, Q extends SimpleA public boolean isExclusive() { - return _exclusive; - } - - public void setExclusive(boolean exclusive) - { - _exclusive = exclusive; + return _exclusivityPolicy != ExclusivityPolicy.NONE; } public Exchange getAlternateExchange() @@ -342,27 +544,27 @@ abstract class SimpleAMQQueue, Q extends SimpleA return _arguments.get(attrName); } - public boolean isAutoDelete() + @Override + public LifetimePolicy getLifetimePolicy() { - return _autoDelete; + return _lifetimePolicy; } public String getOwner() { - return _owner; - } - - public AuthorizationHolder getAuthorizationHolder() - { - return _authorizationHolder; - } - - public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) - { - _authorizationHolder = authorizationHolder; + if(_exclusiveOwner != null) + { + switch(_exclusivityPolicy) + { + case CONTAINER: + return (String) _exclusiveOwner; + case PRINCIPAL: + return ((Principal)_exclusiveOwner).getName(); + } + } + return null; } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -381,7 +583,9 @@ abstract class SimpleAMQQueue, Q extends SimpleA final FilterManager filters, final Class messageClass, final String consumerName, - EnumSet optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException + EnumSet optionSet) + throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException, + ConsumerAccessRefused { // Access control @@ -396,15 +600,77 @@ abstract class SimpleAMQQueue, Q extends SimpleA throw new ExistingExclusiveConsumer(); } + switch(_exclusivityPolicy) + { + case CONNECTION: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel(); + addExclusivityConstraint(target.getSessionModel().getConnectionModel()); + } + else + { + if(_exclusiveOwner != target.getSessionModel().getConnectionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case SESSION: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel(); + addExclusivityConstraint(target.getSessionModel()); + } + else + { + if(_exclusiveOwner != target.getSessionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case LINK: + if(getConsumerCount() != 0) + { + throw new ConsumerAccessRefused(); + } + break; + case PRINCIPAL: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else + { + if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case CONTAINER: + if(_exclusiveOwner == null) + { + _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else + { + if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case NONE: + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); - if (exclusive && !isTransient && getConsumerCount() != 0) - { - throw new ExistingConsumerPreventsExclusive(); - } - QueueConsumer consumer = new QueueConsumer(filters, messageClass, optionSet.contains(Consumer.Option.ACQUIRES), optionSet.contains(Consumer.Option.SEES_REQUEUES), @@ -473,11 +739,12 @@ abstract class SimpleAMQQueue, Q extends SimpleA consumer.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); + consumer.setQueueContext(null); - if(!isDeleted() && isExclusive() && getConsumerCount() == 0) + if(_exclusivityPolicy == ExclusivityPolicy.LINK) { - setAuthorizationHolder(null); + _exclusiveOwner = null; } if(_messageGroupManager != null) @@ -495,8 +762,12 @@ abstract class SimpleAMQQueue, Q extends SimpleA // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 ) + if(!consumer.isTransient() + && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS ) + && getConsumerCount() == 0) { + if (_logger.isInfoEnabled()) { _logger.info("Auto-deleting queue:" + this); @@ -1266,12 +1537,14 @@ abstract class SimpleAMQQueue, Q extends SimpleA }); } - public void addQueueDeleteTask(final Action task) + @Override + public void addDeleteTask(final Action task) { _deleteTaskList.add(task); } - public void removeQueueDeleteTask(final Action task) + @Override + public void removeDeleteTask(final Action task) { _deleteTaskList.remove(task); } @@ -1343,9 +1616,9 @@ abstract class SimpleAMQQueue, Q extends SimpleA } - for (Action task : _deleteTaskList) + for (Action task : _deleteTaskList) { - task.performAction(this); + task.performAction((Q)this); } _deleteTaskList.clear(); @@ -1940,6 +2213,26 @@ abstract class SimpleAMQQueue, Q extends SimpleA return _notificationChecks; } + private static class DeleteDeleteTask implements Action + { + + private final Deletable _lifetimeObject; + private final Action _deleteQueueOwnerTask; + + public DeleteDeleteTask(final Deletable lifetimeObject, + final Action deleteQueueOwnerTask) + { + _lifetimeObject = lifetimeObject; + _deleteQueueOwnerTask = deleteQueueOwnerTask; + } + + @Override + public void performAction(final Deletable object) + { + _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask); + } + } + private final class QueueEntryListener implements StateChangeListener { @@ -1990,38 +2283,6 @@ abstract class SimpleAMQQueue, Q extends SimpleA return ids; } - public AMQSessionModel getExclusiveOwningSession() - { - return _exclusiveOwner; - } - - public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) - { - _exclusive = true; - _exclusiveOwner = exclusiveOwner; - } - - - public void configure(QueueConfiguration config) - { - if (config != null) - { - setMaximumMessageAge(config.getMaximumMessageAge()); - setMaximumQueueDepth(config.getMaximumQueueDepth()); - setMaximumMessageSize(config.getMaximumMessageSize()); - setMaximumMessageCount(config.getMaximumMessageCount()); - setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); - setMaximumDeliveryCount(config.getMaxDeliveryCount()); - _capacity = config.getCapacity(); - _flowResumeCapacity = config.getFlowResumeCapacity(); - } - } - - public long getMessageDequeueCount() - { - return _dequeueCount.get(); - } - public long getTotalEnqueueSize() { return _enqueueSize.get(); @@ -2130,20 +2391,13 @@ abstract class SimpleAMQQueue, Q extends SimpleA @Override public void setDescription(String description) { - if (description == null) - { - _arguments.remove(Queue.DESCRIPTION); - } - else - { - _arguments.put(Queue.DESCRIPTION, description); - } + _description = description; } @Override public String getDescription() { - return (String) _arguments.get(Queue.DESCRIPTION); + return _description; } public final > int send(final M message, @@ -2176,4 +2430,228 @@ abstract class SimpleAMQQueue, Q extends SimpleA } + @Override + public boolean verifySessionAccess(final AMQSessionModel session) + { + boolean allowed; + switch(_exclusivityPolicy) + { + case NONE: + allowed = true; + break; + case SESSION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session; + break; + case CONNECTION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel(); + break; + case PRINCIPAL: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal()); + break; + case CONTAINER: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName()); + break; + case LINK: + allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session; + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } + return allowed; + } + + @Override + public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy) + throws ExistingConsumerPreventsExclusive + { + if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE)) + { + switch(desiredPolicy) + { + case NONE: + _exclusiveOwner = null; + break; + case PRINCIPAL: + switchToPrincipalExclusivity(); + break; + case CONTAINER: + switchToContainerExclusivity(); + break; + case CONNECTION: + switchToConnectionExclusivity(); + break; + case SESSION: + switchToSessionExclusivity(); + break; + case LINK: + switchToLinkExclusivity(); + break; + } + _exclusivityPolicy = desiredPolicy; + } + } + + private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive + { + switch (getConsumerCount()) + { + case 1: + _exclusiveSubscriber = getConsumerList().getHead().getConsumer(); + // deliberate fall through + case 0: + _exclusiveOwner = null; + break; + default: + throw new ExistingConsumerPreventsExclusive(); + } + + } + + private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive + { + + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + case CONTAINER: + case CONNECTION: + AMQSessionModel session = null; + for(Consumer c : getConsumers()) + { + if(session == null) + { + session = c.getSessionModel(); + } + else if(!session.equals(c.getSessionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = session; + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + case PRINCIPAL: + AMQConnectionModel con = null; + for(Consumer c : getConsumers()) + { + if(con == null) + { + con = c.getSessionModel().getConnectionModel(); + } + else if(!con.equals(c.getSessionModel().getConnectionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = con; + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + String containerID = null; + for(Consumer c : getConsumers()) + { + if(containerID == null) + { + containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = containerID; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + } + + private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + Principal principal = null; + for(Consumer c : getConsumers()) + { + if(principal == null) + { + principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = principal; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + } + + private class ClearOwnerAction implements Action + { + private final Deletable _lifetimeObject; + private DeleteDeleteTask _deleteTask; + + public ClearOwnerAction(final Deletable lifetimeObject) + { + _lifetimeObject = lifetimeObject; + } + + @Override + public void performAction(final Deletable object) + { + if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject) + { + SimpleAMQQueue.this._exclusiveOwner = null; + } + if(_deleteTask != null) + { + removeDeleteTask(_deleteTask); + } + } + + public void setDeleteTask(final DeleteDeleteTask deleteTask) + { + _deleteTask = deleteTask; + } + } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index f85f9479bb..d1da7feddc 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -21,11 +21,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public class SortedQueue extends OutOfOrderQueue { @@ -35,28 +37,26 @@ public class SortedQueue extends OutOfOrderQueue arguments, String sortedPropertyName) + protected SortedQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, + Map attributes, + QueueEntryListFactory factory) { - this(id, name, durable, owner, autoDelete, exclusive, - virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName)); + super(virtualHost, creatingSession, attributes, factory); + _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes); } - protected SortedQueue(UUID id, final String name, - final boolean durable, final String owner, final boolean autoDelete, - final boolean exclusive, final VirtualHost virtualHost, - Map arguments, - String sortedPropertyName, - QueueEntryListFactory factory) + protected SortedQueue(VirtualHost virtualHost, + final AMQSessionModel creatingSession, Map attributes) { - super(id, name, durable, owner, autoDelete, exclusive, - virtualHost, factory, arguments); - this._sortedPropertyName = sortedPropertyName; + this(virtualHost, + creatingSession, attributes, + new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes))); } + public String getSortedPropertyName() { return _sortedPropertyName; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java index e2ccdc45cf..8a0570eb4e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java @@ -20,22 +20,16 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -import java.util.UUID; public class StandardQueue extends SimpleAMQQueue { - public StandardQueue(final UUID id, - final String name, - final boolean durable, - final String owner, - final boolean autoDelete, - final boolean exclusive, - final VirtualHost virtualHost, - final Map arguments) + public StandardQueue(final VirtualHost virtualHost, + final AMQSessionModel creatingSession, final Map arguments) { - super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new StandardQueueEntryList.Factory(), arguments); + super(virtualHost, creatingSession, arguments, new StandardQueueEntryList.Factory()); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java index a379f85bbb..b9e0e5920f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.queue.AMQQueue; /** @@ -139,8 +140,8 @@ public class ObjectProperties { setName(queue.getName()); - put(Property.AUTO_DELETE, queue.isAutoDelete()); - put(Property.TEMPORARY, queue.isAutoDelete()); + put(Property.AUTO_DELETE, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); + put(Property.TEMPORARY, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); put(Property.DURABLE, queue.isDurable()); put(Property.EXCLUSIVE, queue.isExclusive()); if (queue.getAlternateExchange() != null) @@ -151,10 +152,7 @@ public class ObjectProperties { put(Property.OWNER, queue.getOwner()); } - else if (queue.getAuthorizationHolder() != null) - { - put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName()); - } + } public ObjectProperties(Exchange exch, AMQQueue queue, String routingKey) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index fc5806ccce..096734ff73 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -42,17 +42,11 @@ public class DurableConfigurationStoreHelper private static final String BINDING = Binding.class.getSimpleName(); private static final String EXCHANGE = Exchange.class.getSimpleName(); private static final String QUEUE = Queue.class.getSimpleName(); - private static final Set QUEUE_ARGUMENTS_EXCLUDES = new HashSet(Arrays.asList(Queue.NAME, - Queue.OWNER, - Queue.EXCLUSIVE, - Queue.ALTERNATE_EXCHANGE)); + private static final Set QUEUE_ARGUMENTS_EXCLUDES = new HashSet(Arrays.asList(Queue.ALTERNATE_EXCHANGE)); public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) { Map attributesMap = new LinkedHashMap(); - attributesMap.put(Queue.NAME, queue.getName()); - attributesMap.put(Queue.OWNER, queue.getOwner()); - attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); if (queue.getAlternateExchange() != null) { @@ -75,9 +69,6 @@ public class DurableConfigurationStoreHelper public static void createQueue(DurableConfigurationStore store, AMQQueue queue) { Map attributesMap = new HashMap(); - attributesMap.put(Queue.NAME, queue.getName()); - attributesMap.put(Queue.OWNER, queue.getOwner()); - attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); if (queue.getAlternateExchange() != null) { attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); @@ -103,7 +94,7 @@ public class DurableConfigurationStoreHelper Map attributesMap = new HashMap(); attributesMap.put(Exchange.NAME, exchange.getName()); attributesMap.put(Exchange.TYPE, exchange.getTypeName()); - attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name() + attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name() : LifetimePolicy.PERMANENT.name()); store.create(exchange.getId(), EXCHANGE, attributesMap); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java new file mode 100644 index 0000000000..a6b16bbfeb --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public interface Deletable +{ + void addDeleteTask(Action task); + void removeDeleteTask(Action task); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java index 37e0177b00..3543ce3bcf 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; public class MapValueConverter { @@ -217,6 +218,13 @@ public class MapValueConverter return getIntegerAttribute(name, attributes, null); } + public static Long getLongAttribute(String name, Map attributes) + { + assertMandatoryAttribute(name, attributes); + Object obj = attributes.get(name); + return toLong(name, obj, null); + } + public static Long getLongAttribute(String name, Map attributes, Long defaultValue) { Object obj = attributes.get(name); @@ -409,4 +417,41 @@ public class MapValueConverter return (T) value; } + + public static UUID getUUIDAttribute(String name, Map attributes) + { + assertMandatoryAttribute(name, attributes); + return getUUIDAttribute(name, attributes, null); + } + + public static UUID getUUIDAttribute(String name, Map attributes, UUID defaultVal) + { + final Object value = attributes.get(name); + return toUUID(value, defaultVal); + } + + private static UUID toUUID(final Object value, final UUID defaultVal) + { + if(value == null) + { + return defaultVal; + } + else if(value instanceof UUID) + { + return (UUID)value; + } + else if(value instanceof String) + { + return UUID.fromString((String)value); + } + else if(value instanceof byte[]) + { + return UUID.nameUUIDFromBytes((byte[])value); + } + else + { + throw new IllegalArgumentException("Cannot convert " + value.getClass().getName() + " to UUID"); + } + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index aa70bb3e8d..0e9b879316 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; @@ -69,7 +73,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener @@ -529,7 +535,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg int purged = queue.delete(); getQueueRegistry().unregisterQueue(queue.getName()); - if (queue.isDurable() && !queue.isAutoDelete()) + if (queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) { DurableConfigurationStore store = getDurableConfigurationStore(); DurableConfigurationStoreHelper.removeQueue(store, queue); @@ -538,26 +547,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } } - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map arguments) throws QpidSecurityException, QueueExistsException + public AMQQueue createQueue(final AMQSessionModel creatingSession, Map attributes) throws QpidSecurityException, QueueExistsException { + // make a copy as we may augment (with an ID for example) + attributes = new LinkedHashMap(attributes); - if (queueName == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } + String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); + ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE); + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); - // Access check + // Access check if (!getSecurityManager().authoriseCreateQueue(autoDelete, durable, - exclusive, + exclusive != null && exclusive != ExclusivityPolicy.NONE, null, null, queueName, @@ -573,22 +580,27 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); } - if(id == null) + if(!attributes.containsKey(Queue.ID)) { - id = UUIDGenerator.generateExchangeUUID(queueName, getName()); + UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName()); while(_queueRegistry.getQueue(id) != null) { id = UUID.randomUUID(); } + attributes.put(Queue.ID, id); } - else if(_queueRegistry.getQueue(id) != null) + else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null) { - throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName)); + throw new QueueExistsException("Queue with id " + + MapValueConverter.getUUIDAttribute(Queue.ID, + attributes) + + " already exists", _queueRegistry.getQueue(queueName)); } - return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, - arguments); + + + return _queueFactory.createQueue(creatingSession, attributes); } } @@ -980,13 +992,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg // house keeping task from running. } } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) { if (_logger.isDebugEnabled()) { _logger.debug("Checking for long running open transactions on connection " + connection); } - for (AMQSessionModel session : connection.getSessionModels()) + for (AMQSessionModel session : connection.getSessionModels()) { if (_logger.isDebugEnabled()) { @@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { return _model; } + + } + + @Override + public long getDefaultAlertThresholdMessageAge() + { + return getConfiguration().getMaximumMessageAge(); + } + + @Override + public long getDefaultAlertThresholdMessageSize() + { + return getConfiguration().getMaximumMessageSize(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthMessages() + { + return getConfiguration().getMaximumMessageCount(); + } + + @Override + public long getDefaultAlertThresholdQueueDepthBytes() + { + return getConfiguration().getMaximumQueueDepth(); + } + + @Override + public long getDefaultAlertRepeatGap() + { + return getConfiguration().getMinimumAlertRepeatGap(); + } + + @Override + public long getDefaultQueueFlowControlSizeBytes() + { + return getConfiguration().getCapacity(); + } + + @Override + public long getDefaultQueueFlowResumeSizeBytes() + { + return getConfiguration().getFlowResumeCapacity(); + } + + @Override + public int getDefaultMaximumDeliveryAttempts() + { + return getConfiguration().getMaxDeliveryCount(); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 12f8c7dae8..88caf73032 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -42,6 +42,7 @@ import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; public class DefaultUpgraderProvider implements UpgraderProvider { + public static final String EXCLUSIVE = "exclusive"; private final ExchangeRegistry _exchangeRegistry; private final VirtualHost _virtualHost; @@ -63,7 +64,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); case 2: currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); - + case 3: + currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader()); case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; @@ -263,4 +265,49 @@ public class DefaultUpgraderProvider implements UpgraderProvider } } + /* + * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum + * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER" + * ensure OWNER is null unless the exclusivity policy is CONTAINER + */ + private class Version3Upgrader extends NonNullUpgrader + { + + @Override + public void configuredObject(UUID id, String type, Map attributes) + { + if(Queue.class.getSimpleName().equals(type)) + { + Map newAttributes = new LinkedHashMap(attributes); + if(attributes.get(EXCLUSIVE) instanceof Boolean) + { + boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE); + newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); + if(!isExclusive && attributes.containsKey("owner")) + { + newAttributes.remove("owner"); + } + } + else + { + newAttributes.remove("owner"); + } + if(!attributes.containsKey("durable")) + { + newAttributes.put("durable","true"); + } + attributes = newAttributes; + getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes)); + } + + getNextUpgrader().configuredObject(id,type,attributes); + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index 59ff1ce6a1..c687cbda92 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -69,7 +69,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); boolean autoDelete = lifeTimePolicy == null - || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; + || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT; try { _exchange = _exchangeRegistry.getExchange(id); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index fd836fdd98..621ea02059 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -104,13 +104,6 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer queueArgumentsMap = new LinkedHashMap(_attributes); - queueArgumentsMap.remove(Queue.NAME); - queueArgumentsMap.remove(Queue.OWNER); - queueArgumentsMap.remove(Queue.EXCLUSIVE); try { @@ -122,8 +115,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer attributes = new LinkedHashMap(_attributes); + attributes.put(Queue.ID, _id); + _queue = _queueFactory.restoreQueue(attributes); } } catch (QpidSecurityException e) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 7cd9045af8..9996684bad 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.QpidSecurityException; @@ -59,14 +60,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable int removeQueue(AMQQueue queue) throws QpidSecurityException; - AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map arguments) throws QueueExistsException, QpidSecurityException; + AMQQueue createQueue(final AMQSessionModel creatingSession, Map arguments) throws QueueExistsException, QpidSecurityException; Exchange createExchange(UUID id, @@ -130,4 +124,20 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable public void block(); public void unblock(); + + long getDefaultAlertThresholdMessageAge(); + + long getDefaultAlertThresholdMessageSize(); + + long getDefaultAlertThresholdQueueDepthMessages(); + + long getDefaultAlertThresholdQueueDepthBytes(); + + long getDefaultAlertRepeatGap(); + + long getDefaultQueueFlowControlSizeBytes(); + + long getDefaultQueueFlowResumeSizeBytes(); + + int getDefaultMaximumDeliveryAttempts(); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index 96ac92a0f9..2c3420a718 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -137,9 +137,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumMessageAge()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumMessageAge()); } public void testGetMaximumQueueDepth() throws ConfigurationException @@ -153,9 +150,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumQueueDepth()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumQueueDepth()); } public void testGetMaximumMessageSize() throws ConfigurationException @@ -169,9 +163,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumMessageSize()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumMessageSize()); } public void testGetMaximumMessageCount() throws ConfigurationException @@ -185,22 +176,11 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMaximumMessageCount()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMaximumMessageCount()); } public void testGetMinimumAlertRepeatGap() throws Exception { - // set broker attribute ALERT_REPEAT_GAP to 10 - when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(10); - - // check that broker level setting is available on queue configuration QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); - assertEquals(10, qConf.getMinimumAlertRepeatGap()); - - // remove configuration for ALERT_REPEAT_GAP on broker level - when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(null); // Check default value qConf = new QueueConfiguration("test", _emptyConf); @@ -211,9 +191,6 @@ public class QueueConfigurationTest extends TestCase qConf = new QueueConfiguration("test", vhostConfig); assertEquals(2, qConf.getMinimumAlertRepeatGap()); - // Check inherited value - qConf = new QueueConfiguration("test", _fullHostConf); - assertEquals(1, qConf.getMinimumAlertRepeatGap()); } public void testSortQueueConfiguration() throws ConfigurationException diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 5a12a411cf..1358798a38 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -35,8 +35,10 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; +import java.security.Principal; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -333,14 +335,26 @@ public class MockConsumer implements ConsumerTarget } @Override - public int compareTo(AMQSessionModel o) + public void close(AMQConstant cause, String message) { - return getId().compareTo(o.getId()); } @Override - public void close(AMQConstant cause, String message) + public void addDeleteTask(final Action task) { + + } + + @Override + public void removeDeleteTask(final Action task) + { + + } + + @Override + public int compareTo(final Object o) + { + return 0; } } @@ -430,12 +444,6 @@ public class MockConsumer implements ConsumerTarget return null; } - @Override - public String getUserName() - { - return null; - } - @Override public boolean isSessionNameUnique(byte[] name) { @@ -454,6 +462,12 @@ public class MockConsumer implements ConsumerTarget return null; } + @Override + public String getRemoteContainerName() + { + return null; + } + @Override public String getClientVersion() { @@ -467,7 +481,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public String getPrincipalAsString() + public Principal getAuthorizedPrincipal() { return null; } @@ -512,5 +526,17 @@ public class MockConsumer implements ConsumerTarget { return null; } + + @Override + public void addDeleteTask(final Action task) + { + + } + + @Override + public void removeDeleteTask(final Action task) + { + + } } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 573f4a4aaa..e8a2223b15 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -20,17 +20,23 @@ */ package org.apache.qpid.server.exchange; +import java.util.HashMap; import java.util.List; +import java.util.Map; + import junit.framework.Assert; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.QueueExistsException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -70,10 +76,17 @@ public class TopicExchangeTest extends QpidTestCase } } + private AMQQueue createQueue(String name) throws QpidSecurityException, QueueExistsException + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, name); + return _vhost.createQueue(null, attributes); + } + public void testNoRoute() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a*#b"); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -84,8 +97,7 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, - false, null); + AMQQueue queue = createQueue("ab"); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -107,7 +119,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); + AMQQueue queue = createQueue("a*"); _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); @@ -138,7 +150,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); + AMQQueue queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); @@ -189,8 +201,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); routeMessage("a.c.d.b",0l); @@ -215,8 +226,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchAfterHash() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); @@ -254,8 +264,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -276,8 +285,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a#"); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -298,8 +306,7 @@ public class TopicExchangeTest extends QpidTestCase public void testSubMatchFails() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); int queueCount = routeMessage("a.b.c",0l); @@ -328,8 +335,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreRouting() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -342,8 +348,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreQueue() throws Exception { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, - false, null); + AMQQueue queue = createQueue("a"); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java index 13e637542b..4d38025068 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.protocol.AMQConnectionModel; +import java.security.Principal; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,9 +45,12 @@ public class ConnectionLogSubjectTest extends AbstractTestLogSubject { super.setUp(); + final Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(USER); + _connection = mock(AMQConnectionModel.class); when(_connection.getConnectionId()).thenReturn(CONNECTION_ID); - when(_connection.getPrincipalAsString()).thenReturn(USER); + when(_connection.getAuthorizedPrincipal()).thenReturn(principal); when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING); when(_connection.getVirtualHostName()).thenReturn(VHOST); _subject = new ConnectionLogSubject(_connection); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 35ffd08863..4b6a51f84b 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.server.queue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,9 +40,10 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -98,33 +94,19 @@ public class AMQQueueFactoryTest extends QpidTestCase private void delegateVhostQueueCreation() throws Exception { - final ArgumentCaptor id = ArgumentCaptor.forClass(UUID.class); - final ArgumentCaptor queueName = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor durable = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor owner = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor autoDelete = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor exclusive = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class); - final ArgumentCaptor arguments = ArgumentCaptor.forClass(Map.class); - - when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(), - autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then( + + final ArgumentCaptor attributes = ArgumentCaptor.forClass(Map.class); + + when(_virtualHost.createQueue(any(AMQSessionModel.class), attributes.capture())).then( new Answer() { @Override public AMQQueue answer(InvocationOnMock invocation) throws Throwable { - return _queueFactory.createQueue(id.getValue(), - queueName.getValue(), - durable.getValue(), - owner.getValue(), - autoDelete.getValue(), - exclusive.getValue(), - deleteOnNoConsumer.getValue(), - arguments.getValue()); + return _queueFactory.createQueue(null, attributes.getValue()); } } - ); + ); } private void mockQueueRegistry() @@ -217,17 +199,14 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testPriorityQueueRegistration() throws Exception { - Map attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testPriorityQueue"); + attributes.put(Queue.PRIORITIES, 5); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - "testPriorityQueue", - false, - "owner", - false, - false, - false, - attributes); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); @@ -240,10 +219,12 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = getName(); String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, - false, - false, - null); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass()); verifyQueueRegistered(queueName); @@ -261,7 +242,6 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueEnabled() throws Exception { - Map attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; @@ -270,14 +250,13 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - false, - false, - false, - attributes); + Map attributes = new HashMap(); + + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); @@ -314,14 +293,11 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - false, - false, - false, - null); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); Exchange altExchange = queue.getAlternateExchange(); @@ -348,7 +324,8 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueDisabled() throws Exception { - Map attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false); + Map attributes = new HashMap(); + String queueName = "testDeadLetterQueueDisabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; @@ -357,14 +334,11 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - false, - false, - false, - attributes); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, false); + + AMQQueue queue = _queueFactory.createQueue(null, attributes); assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); @@ -382,7 +356,6 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception { - Map attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; @@ -391,16 +364,18 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + //create an autodelete queue - AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - queueName, - false, - "owner", - true, - false, - false, - attributes); - assertTrue("Queue should be autodelete", queue.isAutoDelete()); + AMQQueue queue = _queueFactory.createQueue(null, attributes); + assertEquals("Queue should be autodelete", + LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS, + queue.getLifetimePolicy()); //ensure that the autodelete property overrides the request to enable DLQ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); @@ -417,16 +392,13 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testMaximumDeliveryCount() throws Exception { - Map attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testMaximumDeliveryCount"); + + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - "testMaximumDeliveryCount", - false, - "owner", - false, - false, - false, - attributes); + final AMQQueue queue = _queueFactory.createQueue(null, attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); @@ -440,14 +412,11 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testMaximumDeliveryCountDefault() throws Exception { - final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), - "testMaximumDeliveryCount", - false, - "owner", - false, - false, - false, - null); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault"); + + final AMQQueue queue = _queueFactory.createQueue(null, attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); @@ -462,15 +431,16 @@ public class AMQQueueFactoryTest extends QpidTestCase { try { - _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, - false, - null); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + + _queueFactory.createQueue(null, attributes); fail("queue with null name can not be created!"); } catch (Exception e) { assertTrue(e instanceof IllegalArgumentException); - assertEquals("Queue name must not be null", e.getMessage()); + assertEquals("Value for attribute name is not found", e.getMessage()); } } @@ -486,9 +456,14 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); - Map attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, false, attributes); + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + + _queueFactory.createQueue(null, attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); } catch (Exception e) @@ -511,9 +486,14 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); - Map attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, false, attributes); + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + + _queueFactory.createQueue(null, attributes); fail("queue with DLE name having more than 255 characters can not be created!"); } catch (Exception e) @@ -528,6 +508,7 @@ public class AMQQueueFactoryTest extends QpidTestCase { Map arguments = new HashMap(); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"mykey"); arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1"); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java index c2291f5eed..42b83d8c47 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java @@ -26,9 +26,11 @@ import junit.framework.TestCase; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; public class ConflationQueueListTest extends TestCase @@ -46,8 +48,11 @@ public class ConflationQueueListTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class), - Collections.emptyMap(),CONFLATION_KEY); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY); + _queue = new ConflationQueue(mock(VirtualHost.class), null, queueAttributes); _list = _queue.getEntries(); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index c462468819..91c0136af6 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; @@ -131,6 +132,12 @@ public class MockAMQQueue implements AMQQueue return 0; } + @Override + public LifetimePolicy getLifetimePolicy() + { + return null; + } + public int getBindingCountHigh() { return 0; @@ -216,7 +223,7 @@ public class MockAMQQueue implements AMQQueue } @Override - public void addQueueDeleteTask(final Action task) + public void addDeleteTask(final Action task) { } @@ -345,7 +352,7 @@ public class MockAMQQueue implements AMQQueue } @Override - public void removeQueueDeleteTask(final Action task) + public void removeDeleteTask(final Action task) { } @@ -478,6 +485,12 @@ public class MockAMQQueue implements AMQQueue return false; } + @Override + public boolean verifySessionAccess(final AMQSessionModel session) + { + return false; + } + public Exchange getAlternateExchange() { return null; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index 3db5d0fb62..0ac4eb8f78 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -26,10 +26,12 @@ import static org.mockito.Mockito.when; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; public class PriorityQueueListTest extends QpidTestCase @@ -45,16 +47,11 @@ public class PriorityQueueListTest extends QpidTestCase protected void setUp() { QueueEntry[] entries = new QueueEntry[PRIORITIES.length]; - - PriorityQueue queue = new PriorityQueue(UUID.randomUUID(), - getName(), - false, - null, - false, - false, - mock(VirtualHost.class), - Collections.emptyMap(), - 10); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.PRIORITIES, 10); + PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), null, queueAttributes); _list = queue.getEntries(); for (int i = 0; i < PRIORITIES.length; i++) diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 66e4286df7..139e32420f 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -27,11 +27,13 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageInstance.EntryState; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.lang.reflect.Field; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.mockito.Mockito.mock; @@ -199,8 +201,10 @@ public abstract class QueueEntryImplTestBase extends TestCase { int numberOfEntries = 5; QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; - StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, - mock(VirtualHost.class), Collections.emptyMap()); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); OrderedQueueEntryList queueEntryList = queue.getEntries(); // create test entries diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java index 1934349a62..956a9fc424 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java @@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; -import java.util.Arrays; -import java.util.EnumSet; -import java.util.Map; +import java.util.*; import org.apache.log4j.Logger; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.exchange.DirectExchange; @@ -51,10 +51,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - abstract class SimpleAMQQueueTestBase, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> extends QpidTestCase { private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class); @@ -68,7 +64,7 @@ abstract class SimpleAMQQueueTestBase, Q extends private DirectExchange _exchange; private MockConsumer _consumerTarget = new MockConsumer(); private QueueConsumer _consumer; - private Map _arguments = null; + private Map _arguments = Collections.emptyMap(); @Override public void setUp() throws Exception @@ -78,8 +74,12 @@ abstract class SimpleAMQQueueTestBase, Q extends _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, - false, false, false, _arguments); + Map attributes = new HashMap(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, _qname); + attributes.put(Queue.OWNER, _owner); + + _queue = (Q) _virtualHost.createQueue(null, attributes); _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); } @@ -104,9 +104,10 @@ abstract class SimpleAMQQueueTestBase, Q extends _queue.stop(); try { - _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null, - false, _owner, false, - false, false, _arguments); + Map attributes = new HashMap(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + + _queue = (Q) _virtualHost.createQueue(null, attributes); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -115,10 +116,10 @@ abstract class SimpleAMQQueueTestBase, Q extends e.getMessage().contains("name")); } - _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), - "differentName", false, - _owner, false, - false, false, _arguments); + Map attributes = new HashMap(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, "differentName"); + _queue = (Q) _virtualHost.createQueue(null, attributes); assertNotNull("Queue was not created", _queue); } @@ -1137,15 +1138,17 @@ abstract class SimpleAMQQueueTestBase, Q extends { public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost) { - super(UUIDGenerator.generateRandomUUID(), - "testQueue", - false, - "testOwner", - false, - false, - vhost, - factory, - null); + super(vhost, null, attributes(), factory); + } + + private static Map attributes() + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "test"); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + return attributes; } @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index c115af5a38..286c47fdf6 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -21,11 +21,15 @@ package org.apache.qpid.server.queue; import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import java.util.HashMap; +import java.util.Map; + public class SimpleAMQQueueThreadPoolTest extends QpidTestCase { @@ -50,8 +54,10 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase try { - SimpleAMQQueue queue = (SimpleAMQQueue) - test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, "test"); + AMQQueue queue = test.createQueue(null, attributes); assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index 9457d59300..7aa546e1b3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.mockito.Mockito.mock; @@ -38,8 +41,10 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase public void setUp() throws Exception { mockLogging(); - - StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest"); + StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); queueEntryList = queue.getEntries(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index f8953bb7cc..cfa58f11c4 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -26,9 +26,13 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import static org.mockito.Matchers.eq; @@ -74,8 +78,15 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase attributes = new HashMap(); + attributes.put(Queue.ID,UUID.randomUUID()); + attributes.put(Queue.NAME, getName()); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + attributes.put(Queue.SORT_KEY, "KEY"); + // Create test list - _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory() + _testQueue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory() { @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 25d6dbba60..e55875135a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -20,11 +20,15 @@ package org.apache.qpid.server.queue; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import static org.mockito.Matchers.eq; @@ -42,7 +46,15 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase public void setUp() throws Exception { mockLogging(); - SortedQueue queue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory() + + Map attributes = new HashMap(); + attributes.put(Queue.ID,UUID.randomUUID()); + attributes.put(Queue.NAME, getName()); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + attributes.put(Queue.SORT_KEY, "KEY"); + + SortedQueue queue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory() { @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 2c697cfe2b..fe9273dbc8 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -22,9 +22,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -45,8 +46,11 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBaseemptyMap()); + + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + _testQueue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); _sqel = _testQueue.getEntries(); for(int i = 1; i <= 100; i++) @@ -86,9 +90,10 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBaseemptyMap()); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes); return queue.getEntries(); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index c7b812effe..77886fa15e 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -25,18 +25,21 @@ import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; -import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; + public class StandardQueueTest extends SimpleAMQQueueTestBase { @@ -44,7 +47,12 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "testActiveConsumerCount"); + queueAttributes.put(Queue.OWNER, "testOwner"); + + setQueue(new StandardQueue(null, null, queueAttributes)); assertNull("Queue was created", getQueue()); } catch (IllegalArgumentException e) @@ -58,7 +66,13 @@ public class StandardQueueTest extends SimpleAMQQueueTestBaseemptyMap())); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getQname()); + queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes); + + setQueue(queue); getQueue().setDeleteOnNoConsumers(true); ServerMessage message = createMessage(25l); @@ -75,8 +89,12 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "testActiveConsumerCount"); + queueAttributes.put(Queue.OWNER, "testOwner"); + final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes); //verify adding an active consumer increases the count final MockConsumer consumer1 = new MockConsumer(); @@ -145,8 +163,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, "test"); // create queue with overridden method deliverAsync - StandardQueue testQueue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "test", - false, "testOwner", false, false, getVirtualHost(), null) + StandardQueue testQueue = new StandardQueue(getVirtualHost(), null, queueAttributes) { @Override public void deliverAsync(QueueConsumer sub) @@ -249,16 +268,19 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase { - public DequeuedQueue(final UUID id, - final String queueName, - final boolean durable, - final String owner, - final boolean autoDelete, - final boolean exclusive, - final VirtualHost virtualHost, - final Map arguments) + public DequeuedQueue(VirtualHost virtualHost) + { + super(virtualHost, null, attributes(), new DequeuedQueueEntryListFactory()); + } + + private static Map attributes() { - super(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new DequeuedQueueEntryListFactory(), arguments); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "test"); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + return attributes; } } private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 8b66e7d82f..c5c68ecf45 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.times; import java.io.File; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -40,6 +41,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; @@ -143,7 +145,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE), eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", - org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); + org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()))); } private Map map(Object... vals) @@ -220,7 +222,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map queueAttributes = new HashMap(); queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -240,7 +242,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -258,7 +260,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map queueAttributes = new HashMap(); queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -292,8 +294,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map queueAttributes = new HashMap(); queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -320,8 +320,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map queueAttributes = new HashMap(); queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); @@ -361,13 +359,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { AMQQueue queue = mock(AMQQueue.class); when(queue.getName()).thenReturn(queueName); - when(queue.getOwner()).thenReturn(queueOwner); when(queue.isExclusive()).thenReturn(exclusive); when(queue.getId()).thenReturn(_queueId); when(queue.getAlternateExchange()).thenReturn(alternateExchange); - if(arguments != null && !arguments.isEmpty()) + final Map attributes = arguments == null ? new LinkedHashMap() : new LinkedHashMap(arguments); + attributes.put(Queue.NAME, queueName); + if(exclusive) { - when(queue.getAvailableAttributes()).thenReturn(arguments.keySet()); + when(queue.getOwner()).thenReturn(queueOwner); + + attributes.put(Queue.OWNER, queueOwner); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER); + } + when(queue.getAvailableAttributes()).thenReturn(attributes.keySet()); final ArgumentCaptor requestedAttribute = ArgumentCaptor.forClass(String.class); when(queue.getAttribute(requestedAttribute.capture())).then( new Answer() @@ -377,10 +381,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public Object answer(final InvocationOnMock invocation) throws Throwable { String attrName = requestedAttribute.getValue(); - return arguments.get(attrName); + return attributes.get(attrName); } }); - } return queue; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index fb63fefb88..f082d58d39 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -25,11 +25,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.net.SocketAddress; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -182,8 +184,10 @@ public class BrokerTestHelper public static AMQQueue createQueue(String queueName, VirtualHost virtualHost) throws QpidSecurityException, QueueExistsException { - AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null, - false, false, false, Collections.emptyMap()); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, queueName); + AMQQueue queue = virtualHost.createQueue(null, attributes); return queue; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 223e2c5218..1e25aac197 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.test.utils.QpidTestCase; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; @@ -117,14 +118,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase - final ArgumentCaptor idArg = ArgumentCaptor.forClass(UUID.class); - final ArgumentCaptor queueArg = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor argsArg = ArgumentCaptor.forClass(Map.class); + final ArgumentCaptor attributesArg = ArgumentCaptor.forClass(Map.class); _queueFactory = mock(QueueFactory.class); - when(_queueFactory.restoreQueue(idArg.capture(), queueArg.capture(), - anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then( + when(_queueFactory.restoreQueue(attributesArg.capture())).then( new Answer() { @@ -133,8 +131,9 @@ public class DurableConfigurationRecovererTest extends QpidTestCase { final AMQQueue queue = mock(AMQQueue.class); - final String queueName = queueArg.getValue(); - final UUID queueId = idArg.getValue(); + final Map attributes = attributesArg.getValue(); + final String queueName = (String) attributes.get(Queue.NAME); + final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); when(queue.getName()).thenReturn(queueName); when(queue.getId()).thenReturn(queueId); @@ -153,10 +152,10 @@ public class DurableConfigurationRecovererTest extends QpidTestCase return null; } } - ).when(queue).setAlternateExchange(altExchangeArg.capture()); + ).when(queue).setAlternateExchange(altExchangeArg.capture()); - Map args = argsArg.getValue(); - if(args.containsKey(Queue.ALTERNATE_EXCHANGE)) + Map args = attributes; + if (args.containsKey(Queue.ALTERNATE_EXCHANGE)) { final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); final Exchange exchange = _exchangeRegistry.getExchange(exchangeId); @@ -470,7 +469,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase { queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString()); } - queue.put(Queue.EXCLUSIVE, false); return queue; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index f7eecc73fc..cee7edf1b7 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; @@ -152,14 +153,7 @@ public class MockVirtualHost implements VirtualHost } @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map arguments) + public AMQQueue createQueue(final AMQSessionModel creatingSession, Map arguments) { return null; } @@ -314,4 +308,52 @@ public class MockVirtualHost implements VirtualHost public void unblock() { } + + @Override + public long getDefaultAlertThresholdMessageAge() + { + return 0; + } + + @Override + public long getDefaultAlertThresholdMessageSize() + { + return 0; + } + + @Override + public long getDefaultAlertThresholdQueueDepthMessages() + { + return 0; + } + + @Override + public long getDefaultAlertThresholdQueueDepthBytes() + { + return 0; + } + + @Override + public long getDefaultAlertRepeatGap() + { + return 0; + } + + @Override + public long getDefaultQueueFlowControlSizeBytes() + { + return 0; + } + + @Override + public long getDefaultQueueFlowResumeSizeBytes() + { + return 0; + } + + @Override + public int getDefaultMaximumDeliveryAttempts() + { + return 0; + } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index ff7ce0a79d..9c012eb782 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -25,6 +25,7 @@ import java.security.Principal; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; @@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; @@ -56,7 +58,8 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; -public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder +public class ServerConnection extends Connection implements AMQConnectionModel, + LogSubject, AuthorizationHolder { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); @@ -72,6 +75,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; + + private final CopyOnWriteArrayList> _taskList = + new CopyOnWriteArrayList>(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -197,7 +204,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask = task; } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(ServerSession session, AMQConstant cause, String message) { ExecutionException ex = new ExecutionException(); ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; @@ -211,7 +218,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } ex.setErrorCode(code); ex.setDescription(message); - ((ServerSession)session).invoke(ex); + session.invoke(ex); session.close(cause, message); } @@ -315,6 +322,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) { closeSubscriptions(); + performDeleteTasks(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { @@ -327,6 +335,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel, close(replyCode, message); } + protected void performDeleteTasks() + { + for(Action task : _taskList) + { + task.performAction(this); + } + } + public synchronized void block() { if(!_blocking) @@ -367,12 +383,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.removeSession(ssn); } - public List getSessionModels() + public List getSessionModels() { - List sessions = new ArrayList(); + List sessions = new ArrayList(); for (Session ssn : getChannels()) { - sessions.add((AMQSessionModel) ssn); + sessions.add((ServerSession) ssn); } return sessions; } @@ -475,14 +491,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return String.valueOf(getRemoteAddress()); } - public String getUserName() - { - return _authorizedPrincipal.getName(); - } - @Override public void closed() { + performDeleteTasks(); closeSubscriptions(); super.closed(); } @@ -521,6 +533,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return getConnectionDelegate().getClientId(); } + @Override + public String getRemoteContainerName() + { + return getConnectionDelegate().getClientId(); + } + @Override public String getClientVersion() { @@ -533,11 +551,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return getConnectionDelegate().getClientProduct(); } - public String getPrincipalAsString() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - public long getSessionCountLimit() { return getChannelMax(); @@ -565,4 +578,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.doHeartBeat(); } + + @Override + public void addDeleteTask(final Action task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action task) + { + _taskList.remove(task); + } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index c85a415ce5..dc26249c61 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.security.Principal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -310,14 +311,18 @@ public class ServerConnectionDelegate extends ServerDelegate private boolean isSessionNameUnique(final byte[] name, final Connection conn) { final ServerConnection sconn = (ServerConnection) conn; - final String userId = sconn.getUserName(); + final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal(); + final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName(); final Iterator connections = ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator(); while(connections.hasNext()) { - final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next(); - if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name)) + final AMQConnectionModel amqConnectionModel = connections.next(); + final String userName = amqConnectionModel.getAuthorizedPrincipal() == null + ? "" + : amqConnectionModel.getAuthorizedPrincipal().getName(); + if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name)) { return false; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 0e6b4d3b08..29f9fc549e 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -78,6 +78,7 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -88,7 +89,9 @@ import static org.apache.qpid.util.Serial.gt; public class ServerSession extends Session implements AuthorizationHolder, - AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder + AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder, + Deletable + { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); @@ -132,7 +135,7 @@ public class ServerSession extends Session private Map _subscriptions = new ConcurrentHashMap(); - private final List> _taskList = new CopyOnWriteArrayList>(); + private final List> _taskList = new CopyOnWriteArrayList>(); private final TransactionTimeoutHelper _transactionTimeoutHelper; @@ -374,7 +377,7 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - for (Action task : _taskList) + for (Action task : _taskList) { task.performAction(this); } @@ -610,12 +613,12 @@ public class ServerSession extends Session return getConnection().getAuthorizedSubject(); } - public void addSessionCloseTask(Action task) + public void addDeleteTask(Action task) { _taskList.add(task); } - public void removeSessionCloseTask(Action task) + public void removeDeleteTask(Action task) { _taskList.remove(task); } @@ -652,7 +655,7 @@ public class ServerSession extends Session return _id; } - public AMQConnectionModel getConnectionModel() + public ServerConnection getConnectionModel() { return getConnection(); } @@ -922,7 +925,7 @@ public class ServerSession extends Session } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(ServerSession o) { return getId().compareTo(o.getId()); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 1bd50533ed..b0a60beaf5 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -25,6 +25,8 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.Exchange; @@ -204,47 +206,12 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + else if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } else { - if(queue.isExclusive()) - { - ServerSession s = (ServerSession) session; - queue.setExclusiveOwningSession(s); - - ((ServerSession) session).addSessionCloseTask(new Action() - { - public void performAction(ServerSession session) - { - if(queue.getExclusiveOwningSession() == session) - { - queue.setExclusiveOwningSession(null); - } - } - }); - - if(queue.getAuthorizationHolder() == null) - { - queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new Action() - { - public void performAction(ServerSession session) - { - if(queue.getAuthorizationHolder() == session) - { - queue.setAuthorizationHolder(null); - } - } - }); - } - } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); @@ -302,6 +269,10 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy"); + } } } } @@ -1197,7 +1168,7 @@ public class ServerSessionDelegate extends SessionDelegate exception(session, method, errorCode, description); } - else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + else if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1214,7 +1185,6 @@ public class ServerSessionDelegate extends SessionDelegate try { - String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null; final String alternateExchangeName = method.getAlternateExchange(); @@ -1227,66 +1197,36 @@ public class ServerSessionDelegate extends SessionDelegate final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); - final boolean deleteOnNoConsumer = !exclusive && autoDelete; + arguments.put(Queue.ID, id); + arguments.put(Queue.NAME, queueName); - queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner, - autoDelete, exclusive, deleteOnNoConsumer, - arguments); - - if (autoDelete && exclusive) + LifetimePolicy lifetime; + if(autoDelete) { - final AMQQueue q = queue; - final Action deleteQueueTask = new Action() - { - public void performAction(ServerSession session) - { - try - { - virtualHost.removeQueue(q); - } - catch (QpidSecurityException e) - { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new Action() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END + : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS; } - if (exclusive) + else { - final AMQQueue q = queue; - final Action removeExclusive = new Action() - { - public void performAction(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new Action() - { - public void performAction(AMQQueue queue) - { - s.removeSessionCloseTask(removeExclusive); - } - }); + lifetime = LifetimePolicy.PERMANENT; } + + arguments.put(Queue.LIFETIME_POLICY, lifetime); + + ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE; + + + arguments.put(Queue.DURABLE, method.getDurable()); + + arguments.put(Queue.EXCLUSIVE, exclusivityPolicy); + + queue = virtualHost.createQueue((ServerSession)session, arguments); + } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + if (!queue.verifySessionAccess((ServerSession)session)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1347,11 +1287,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) - { - exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); - } - else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session) + if(!queue.verifySessionAccess((ServerSession)session)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -1424,7 +1360,7 @@ public class ServerSessionDelegate extends SessionDelegate result.setQueue(queue.getName()); result.setDurable(queue.isDurable()); result.setExclusive(queue.isExclusive()); - result.setAutoDelete(queue.isAutoDelete()); + result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT); Map arguments = new LinkedHashMap(); Collection availableAttrs = queue.getAvailableAttributes(); @@ -1500,7 +1436,6 @@ public class ServerSessionDelegate extends SessionDelegate public void closed(Session session) { setThreadSubject(session); - ServerSession serverSession = (ServerSession)session; serverSession.stopSubscriptions(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b15b3f0bfa..fe1cb624e5 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -30,6 +31,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; @@ -86,7 +89,9 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder +public class AMQChannel> + implements AMQSessionModel,T>, + AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -140,7 +145,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AMQProtocolSession _session; + private final T _session; private AtomicBoolean _closing = new AtomicBoolean(false); private final Set _blockingEntities = Collections.synchronizedSet(new HashSet()); @@ -163,12 +168,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); + private final List>> _taskList = + new CopyOnWriteArrayList>>(); + private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + public AMQChannel(T session, int channelId, MessageStore messageStore) throws AMQException { _session = session; @@ -526,7 +534,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException + MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, + MessageSource.ConsumerAccessRefused { if (tag == null) { @@ -580,7 +589,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { filterManager = new SimpleFilterManager(); } - filterManager.add(new FilterSupport.NoLocalFilter(source)); + final Object connectionReference = getConnectionReference(); + filterManager.add(new MessageFilter() + { + @Override + public boolean matches(final Filterable message) + { + return message.getConnectionReference() != connectionReference; + } + }); } Consumer sub = source.addConsumer(target, @@ -609,6 +626,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _tag2SubscriptionTargetMap.remove(tag); throw e; } + catch (MessageSource.ConsumerAccessRefused e) + { + _tag2SubscriptionTargetMap.remove(tag); + throw e; + } return tag; } @@ -657,6 +679,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F CurrentActor.get().message(_logSubject, operationalLogMessage); unsubscribeAllConsumers(); + + for (Action> task : _taskList) + { + task.performAction(this); + } + + _transaction.rollback(); try @@ -692,9 +721,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F Consumer sub = me.getValue().getConsumer(); - - sub.close(); - + if(sub != null) + { + sub.close(); + } } _tag2SubscriptionTargetMap.clear(); @@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _id; } - public AMQConnectionModel getConnectionModel() + @Override + public T getConnectionModel() { return _session; } @@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(AMQChannel o) { return getId().compareTo(o.getId()); } + @Override + public void addDeleteTask(final Action> task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action> task) + { + _taskList.remove(task); + } + private class ImmediateAction implements Action> { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 5e95701e5a..68f1ad7942 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; -import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogActor; @@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -86,7 +83,7 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -103,9 +100,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private VirtualHost _virtualHost; - private final Map _channelMap = new HashMap(); + private final Map> _channelMap = + new HashMap>(); - private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + @SuppressWarnings("unchecked") + private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; /** * The channels that the latest call to {@link #received(ByteBuffer)} applied to. @@ -114,9 +113,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set _channelsForCurrentMessage = new HashSet(); - - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + private final Set> _channelsForCurrentMessage = + new HashSet>(); private final AMQStateManager _stateManager; @@ -124,10 +122,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private SaslServer _saslServer; - private Object _lastReceived; - - private Object _lastSent; - private volatile boolean _closed; // maximum number of channels this session should have @@ -136,8 +130,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); - private FieldTable _clientProperties; - private final List _taskList = new CopyOnWriteArrayList(); + private final List> _taskList = + new CopyOnWriteArrayList>(); private Map _closingChannelsList = new ConcurrentHashMap(); private ProtocolOutputConverter _protocolOutputConverter; @@ -153,12 +147,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _lastIoTime; private long _writtenBytes; - private long _readBytes; - private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); - private long _createTime = System.currentTimeMillis(); private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; @@ -176,6 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; + private long _readBytes; public AMQProtocolEngine(Broker broker, NetworkConnection network, @@ -258,15 +250,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final long arrivalTime = System.currentTimeMillis(); _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; + _readBytes += msg.remaining(); _receivedLock.lock(); try { final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - final int len = dataBlocks.size(); - for (int i = 0; i < len; i++) + for (AMQDataBlock dataBlock : dataBlocks) { - AMQDataBlock dataBlock = dataBlocks.get(i); try { dataBlockReceived(dataBlock); @@ -316,7 +307,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void receivedComplete() { - for (AMQChannel channel : _channelsForCurrentMessage) + for (AMQChannel channel : _channelsForCurrentMessage) { channel.receivedComplete(); } @@ -334,7 +325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ private void dataBlockReceived(AMQDataBlock message) throws Exception { - _lastReceived = message; if (message instanceof ProtocolInitiation) { protocolInitiationReceived((ProtocolInitiation) message); @@ -363,7 +353,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); - AMQChannel amqChannel = _channelMap.get(channelId); + AMQChannel amqChannel = _channelMap.get(channelId); if(amqChannel != null) { // The _receivedLock is already acquired in the caller @@ -558,14 +548,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt); @@ -611,11 +593,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (Exception e) { - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - _logger.error("Unexpected exception while processing frame. Closing connection.", e); closeProtocolSession(); @@ -625,7 +602,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentHeader(body); @@ -633,7 +610,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentBody(body); } @@ -681,17 +658,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _contextKey = contextKey; } - public List getChannels() + public List> getChannels() { synchronized (_channelMap) { - return new ArrayList(_channelMap.values()); + return new ArrayList>(_channelMap.values()); } } - public AMQChannel getAndAssertChannel(int channelId) throws AMQException + public AMQChannel getAndAssertChannel(int channelId) throws AMQException { - AMQChannel channel = getChannel(channelId); + AMQChannel channel = getChannel(channelId); if (channel == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); @@ -700,9 +677,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return channel; } - public AMQChannel getChannel(int channelId) + public AMQChannel getChannel(int channelId) { - final AMQChannel channel = + final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { @@ -719,7 +696,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } - public void addChannel(AMQChannel channel) throws AMQException + public void addChannel(AMQChannel channel) throws AMQException { if (_closed) { @@ -770,7 +747,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = value; } - public void commitTransactions(AMQChannel channel) throws AMQException + public void commitTransactions(AMQChannel channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -778,7 +755,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void rollbackTransactions(AMQChannel channel) throws AMQException + public void rollbackTransactions(AMQChannel channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -802,7 +779,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeChannel(int channelId, AMQConstant cause, String message) { - final AMQChannel channel = getChannel(channelId); + final AMQChannel channel = getChannel(channelId); if (channel == null) { throw new IllegalArgumentException("Unknown channel id"); @@ -879,12 +856,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /** * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - * - * @throws AMQException if an error occurs while closing any channel */ private void closeAllChannels() { - for (AMQChannel channel : getChannels()) + for (AMQChannel channel : getChannels()) { channel.close(); } @@ -929,9 +904,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeAllChannels(); - for (Task task : _taskList) + for (Action task : _taskList) { - task.doTask(this); + task.performAction(this); } synchronized(this) @@ -961,7 +936,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (InterruptedException e) { - + // do nothing } finally { @@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); } - public String dump() - { - return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; - } - /** @return an object that can be used to identity */ public Object getKey() { @@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setClientProperties(FieldTable clientProperties) { - _clientProperties = clientProperties; - if (_clientProperties != null) + if (clientProperties != null) { - String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); + String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); if (closeWhenNoRoute != null) { _closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute); @@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8); - _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT); + _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8); + _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT); - String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); + String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); if (clientId != null) { setContextKey(new AMQShortString(clientId)); @@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _protocolVersion.getMinorVersion(); } - public boolean isProtocolVersion(byte major, byte minor) - { - return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); - } - public MethodRegistry getRegistry() { return getMethodRegistry(); @@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } - public void addSessionCloseTask(Task task) + public void addDeleteTask(Action task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeDeleteTask(Action task) { _taskList.remove(task); } @@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _clientProduct; } - public String getPrincipalAsString() - { - return getAuthId(); - } - public long getSessionCountLimit() { return getMaximumNumberOfChannels(); } - public Boolean isIncoming() - { - return true; - } - - public Boolean isSystemConnection() - { - return false; - } - - public Boolean isFederationLink() - { - return false; - } - - public String getAuthId() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - - public Integer getRemotePID() - { - return null; - } - - public String getRemoteProcessName() - { - return null; - } - - public Integer getRemoteParentPID() - { - return null; - } - public boolean isDurable() { return false; @@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getRemoteAddress()); } - public long getCreateTime() - { - return _createTime; - } - - public Boolean isShadow() - { - return false; - } - - public void mgmtClose() - { - MethodRegistry methodRegistry = getMethodRegistry(); - ConnectionCloseBody responseBody = - methodRegistry.createConnectionCloseBody( - AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("The connection was closed using the broker's management interface."), - 0,0); - - // This seems ugly but because we use closeConnection in both normal - // broker operation and as part of the management interface it cannot - // be avoided. The Current Actor will be null when this method is - // called via the QMF management interface. As such we need to set one. - boolean removeActor = false; - if (CurrentActor.get() == null) - { - removeActor = true; - CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); - } - - try - { - writeFrame(responseBody.generateFrame(0)); - - closeSession(); - - } - finally - { - if (removeActor) - { - CurrentActor.remove(); - } - } - } - public void mgmtCloseChannel(int channelId) { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public String getClientID() - { - return getContextKey().toString(); - } - - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(AMQChannel session, AMQConstant cause, String message) { - int channelId = ((AMQChannel)session).getChannelId(); + int channelId = session.getChannelId(); closeChannel(channelId, cause, message); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null)); + null)); } public void block() @@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(!_blocking) { _blocking = true; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel channel : _channelMap.values()) { channel.block(); } @@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(_blocking) { _blocking = false; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel channel : _channelMap.values()) { channel.unblock(); } @@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closed; } - public List getSessionModels() + public List> getSessionModels() { - return new ArrayList(getChannels()); + return new ArrayList>(getChannels()); } public LogSubject getLogSubject() @@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } - public void setDeferFlush(boolean deferFlush) + @Override + public String getRemoteContainerName() { - _deferFlush = deferFlush; + return String.valueOf(getContextKey()); } - public String getUserName() + public void setDeferFlush(boolean deferFlush) { - return getAuthorizedPrincipal().getName(); + _deferFlush = deferFlush; } public final class WriteDeliverMethod diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 58a3b5df12..045367b667 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -37,12 +37,14 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.virtualhost.VirtualHost; -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel +public interface AMQProtocolSession> + extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel> { long getSessionID(); @@ -69,11 +71,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth */ SocketAddress getLocalAddress(); - public static interface Task - { - public void doTask(AMQProtocolSession session); - } - /** * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC * 6). @@ -98,7 +95,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth * * @return null if no channel exists, the channel otherwise */ - AMQChannel getChannel(int channelId); + AMQChannel getChannel(int channelId); /** * Associate a channel with this session. @@ -106,7 +103,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth * @param channel the channel to associate with this session. It is an error to associate the same channel with more * than one session but this is not validated. */ - void addChannel(AMQChannel channel) throws AMQException; + void addChannel(AMQChannel channel) throws AMQException; /** * Close a specific channel. This will remove any resources used by the channel, including:
  • any queue @@ -185,10 +182,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setVirtualHost(VirtualHost virtualHost) throws AMQException; - void addSessionCloseTask(Task task); - - void removeSessionCloseTask(Task task); - public ProtocolOutputConverter getProtocolOutputConverter(); void setAuthorizedSubject(Subject authorizedSubject); @@ -209,11 +202,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setMaximumNumberOfChannels(Long value); - void commitTransactions(AMQChannel channel) throws AMQException; + void commitTransactions(AMQChannel channel) throws AMQException; - void rollbackTransactions(AMQChannel channel) throws AMQException; + void rollbackTransactions(AMQChannel channel) throws AMQException; - List getChannels(); + List> getChannels(); void mgmtCloseChannel(int channelId); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index ce90de7aac..c93c164978 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -99,16 +99,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener try { - if (queue.isExclusive() && !queue.isDurable()) - { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (session == null || session.getConnectionModel() != protocolConnection) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); - } - } Map arguments = FieldTable.convertToMap(body.getArguments()); String bindingKey = String.valueOf(routingKey); @@ -144,10 +135,6 @@ public class QueueBindHandler implements StateAwareMethodListener } } } - catch (AMQException e) - { - throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); - } catch (QpidSecurityException e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 5b5525643c..215e3f2f23 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -96,9 +98,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener() { - public void performAction(AMQQueue queue) - { - protocolConnection.removeSessionCloseTask(sessionCloseTask); - } - }); - } - } + queue = createQueue(channel, queueName, body, virtualHost, protocolConnection); } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - AMQSessionModel owningSession = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); @@ -161,19 +134,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener arguments = + Map attributes = QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); - String queueNameString = AMQShortString.toString(queueName); - final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()); + final String queueNameString = AMQShortString.toString(queueName); + attributes.put(Queue.NAME, queueNameString); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName())); + attributes.put(Queue.DURABLE, durable); - final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete, - exclusive, autoDelete, arguments); + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; - if (exclusive && !durable) + if(exclusive) { - final AMQProtocolSession.Task deleteQueueTask = - new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) - { - if (virtualHost.getQueue(queueName.toString()) == queue) - { - try - { - virtualHost.removeQueue(queue); - } - catch (QpidSecurityException e) - { - throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e); - } - } - } - }; - - session.addSessionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action() - { - public void performAction(AMQQueue queue) - { - session.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetimePolicy = autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } + else + { + lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; } + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + + + final AMQQueue queue = virtualHost.createQueue(channel, attributes); + return queue; } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index 3a9a6dc44e..c939e49aab 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -105,8 +105,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener { private final Port _port; @@ -53,8 +54,33 @@ public class Connection_1_0 implements ConnectionEventListener private final Collection _sessions = Collections.synchronizedCollection(new ArrayList()); private final Object _reference = new Object(); - private List> _closeTasks = - Collections.synchronizedList(new ArrayList>()); + + private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); + private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); + private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); + private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); + + private final LogSubject _logSubject = new LogSubject() + { + @Override + public String toLogString() + { + return "[" + + MessageFormat.format(CONNECTION_FORMAT, + getConnectionId(), + getClientId(), + getRemoteAddressString(), + _vhost.getName()) + + "] "; + + } + }; + + private volatile boolean _stopped; + + + private List> _closeTasks = + Collections.synchronizedList(new ArrayList>()); @@ -69,7 +95,7 @@ public class Connection_1_0 implements ConnectionEventListener _transport = transport; _conn = conn; _connectionId = connectionId; - _vhost.getConnectionRegistry().registerConnection(_model); + _vhost.getConnectionRegistry().registerConnection(this); } @@ -80,7 +106,7 @@ public class Connection_1_0 implements ConnectionEventListener public void remoteSessionCreation(SessionEndpoint endpoint) { - Session_1_0 session = new Session_1_0(_vhost, this); + Session_1_0 session = new Session_1_0(_vhost, this, endpoint); _sessions.add(session); endpoint.setSessionEventListener(session); } @@ -90,24 +116,24 @@ public class Connection_1_0 implements ConnectionEventListener _sessions.remove(session); } - void removeConnectionCloseTask(final Action task) + public void removeDeleteTask(final Action task) { _closeTasks.remove( task ); } - void addConnectionCloseTask(final Action task) + public void addDeleteTask(final Action task) { _closeTasks.add( task ); } public void closeReceived() { - List> taskCopy; + List> taskCopy; synchronized (_closeTasks) { - taskCopy = new ArrayList>(_closeTasks); + taskCopy = new ArrayList>(_closeTasks); } - for(Action task : taskCopy) + for(Action task : taskCopy) { task.performAction(this); } @@ -115,7 +141,7 @@ public class Connection_1_0 implements ConnectionEventListener { _closeTasks.clear(); } - _vhost.getConnectionRegistry().deregisterConnection(_model); + _vhost.getConnectionRegistry().deregisterConnection(this); } @@ -125,30 +151,6 @@ public class Connection_1_0 implements ConnectionEventListener closeReceived(); } - private final AMQConnectionModel _model = new AMQConnectionModel() - { - private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); - private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); - - private final LogSubject _logSubject = new LogSubject() - { - @Override - public String toLogString() - { - return "[" + - MessageFormat.format(CONNECTION_FORMAT, - getConnectionId(), - getClientId(), - getRemoteAddressString(), - _vhost.getName()) - + "] "; - - } - }; - - private volatile boolean _stopped; @Override public void close(AMQConstant cause, String message) @@ -169,9 +171,9 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(Session_1_0 session, AMQConstant cause, String message) { - // TODO + session.close(cause, message); } @Override @@ -181,9 +183,9 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public List getSessionModels() + public List getSessionModels() { - return new ArrayList(_sessions); + return new ArrayList(_sessions); } @Override @@ -192,12 +194,6 @@ public class Connection_1_0 implements ConnectionEventListener return _logSubject; } - @Override - public String getUserName() - { - return getPrincipalAsString(); - } - @Override public boolean isSessionNameUnique(byte[] name) { @@ -216,7 +212,13 @@ public class Connection_1_0 implements ConnectionEventListener return _conn.getRemoteContainerId(); } - @Override + @Override + public String getRemoteContainerName() + { + return _conn.getRemoteContainerId(); + } + + @Override public String getClientVersion() { return ""; //TODO @@ -228,10 +230,9 @@ public class Connection_1_0 implements ConnectionEventListener return ""; //TODO } - @Override - public String getPrincipalAsString() + public Principal getAuthorizedPrincipal() { - return String.valueOf(_conn.getUser()); + return _conn.getUser(); } @Override @@ -337,11 +338,10 @@ public class Connection_1_0 implements ConnectionEventListener } - }; AMQConnectionModel getModel() { - return _model; + return this; } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 546cc79f9e..f7e2d2df50 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -30,6 +30,9 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -207,15 +210,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - queue = _vhost.createQueue( - UUIDGenerator.generateQueueUUID(name, _vhost.getName()), - name, - isDurable, - null, - true, - true, - true, - Collections.EMPTY_MAP); + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName())); + attributes.put(Queue.NAME, name); + attributes.put(Queue.DURABLE, isDurable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK); + + queue = _vhost.createQueue(getSession(), attributes); } else { @@ -308,44 +310,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); - if(!isDurable) - { - final String queueName = name; - final AMQQueue tempQueue = queue; - - final Action deleteQueueTask = - new Action() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue", e); - } - } - } - }; - - getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action() - { - public void performAction(AMQQueue queue) - { - getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); - } - - - }); - } - qd = new QueueDestination(queue); } catch (QpidSecurityException e) @@ -409,6 +373,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer"); throw new ConnectionScopedRuntimeException(e); } + catch (MessageSource.ConsumerAccessRefused e) + { + _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy"); + throw new ConnectionScopedRuntimeException(e); + } } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index c055d1e840..6840c7344a 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -26,8 +26,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.LifetimePolicy; import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; @@ -36,13 +38,13 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; @@ -55,25 +57,34 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject +public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject { private static final Logger _logger = Logger.getLogger(Session_1_0.class); private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + private final SessionEndpoint _endpoint; private VirtualHost _vhost; private AutoCommitTransaction _transaction; private final LinkedHashMap _openTransactions = new LinkedHashMap(); + + private final CopyOnWriteArrayList> _taskList = + new CopyOnWriteArrayList>(); + private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); + private AtomicBoolean _closed = new AtomicBoolean(); - public Session_1_0(VirtualHost vhost, final Connection_1_0 connection) + public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) { _vhost = vhost; + _endpoint = endpoint; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; @@ -333,64 +344,41 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu LifetimePolicy lifetimePolicy = properties == null ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); + Map attributes = new HashMap(); + attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName())); + attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); + attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false); - final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - false, - properties); - - - - if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) + if(lifetimePolicy instanceof DeleteOnNoLinks) { - final Action deleteQueueTask = - new Action() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue from vhost", e); - } - } - } - }; - - _connection.addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action() - { - public void performAction(AMQQueue queue) - { - _connection.removeConnectionCloseTask(deleteQueueTask); - } - - - }); + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS); + } + else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + { + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinks) + else if(lifetimePolicy instanceof DeleteOnClose) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } else if(lifetimePolicy instanceof DeleteOnNoMessages) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + else { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } + + + // TODO convert AMQP 1-0 node properties to queue attributes + + final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); } catch (QpidSecurityException e) { @@ -462,11 +450,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } - public void forceEnd() - { - } - - @Override public UUID getId() { @@ -474,9 +457,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public AMQConnectionModel getConnectionModel() + public Connection_1_0 getConnectionModel() { - return _connection.getModel(); + return _connection; } @Override @@ -489,14 +472,35 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public void close() { - // TODO - required for AMQSessionModel / management initiated closing + performCloseTasks(); + _endpoint.end(); + } + + protected void performCloseTasks() + { + + if(_closed.compareAndSet(false, true)) + { + List> taskList = new ArrayList>(_taskList); + _taskList.clear(); + for(Action task : taskList) + { + task.performAction(this); + } + } } @Override public void close(AMQConstant cause, String message) { - // TODO - required for AMQSessionModel + performCloseTasks(); + final End end = new End(); + final Error theError = new Error(); + theError.setDescription(message); + theError.setCondition(ConnectionError.CONNECTION_FORCED); + end.setError(theError); + _endpoint.end(end); } @Override @@ -586,8 +590,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public int getChannelId() { - // TODO - return 0; + return _endpoint.getSendingChannel(); } @Override @@ -609,13 +612,12 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu connectionId, getClientID(), remoteAddress, - _vhost.getName(), // TODO - virtual host - 0) // TODO - channel) - + "] "; + _vhost.getName(), + _endpoint.getSendingChannel()) + "] "; } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(Session_1_0 o) { return getId().compareTo(o.getId()); } @@ -625,4 +627,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu return _connection; } + @Override + public void addDeleteTask(final Action task) + { + if(!_closed.get()) + { + _taskList.add(task); + } + } + + @Override + public void removeDeleteTask(final Action task) + { + _taskList.remove(task); + } } diff --git a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 7d76a0ee8e..3ded20ae6a 100644 --- a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -971,33 +971,15 @@ class ManagementNode implements MessageSource session) { - return false; + return true; } @Override diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java index 407da0fd3f..ed5e195043 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java @@ -166,7 +166,7 @@ public class ExchangeMBean extends AMQManagedObject implements ManagedExchange public boolean isAutoDelete() { - return _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE; + return _exchange.getLifetimePolicy() != LifetimePolicy.PERMANENT; } public TabularData bindings() throws IOException, JMException diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 1365ceb06a..b44a752312 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; @@ -194,7 +195,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public boolean isAutoDelete() { - return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE; + return _queue.getLifetimePolicy() != LifetimePolicy.PERMANENT; } public Long getMaximumMessageAge() @@ -264,12 +265,29 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public boolean isExclusive() { - return (Boolean) _queue.getAttribute(Queue.EXCLUSIVE); + final Object attribute = _queue.getAttribute(Queue.EXCLUSIVE); + return attribute != null && attribute != ExclusivityPolicy.NONE; } public void setExclusive(boolean exclusive) { - _queue.setAttribute(Queue.EXCLUSIVE, isExclusive(), exclusive); + if(exclusive) + { + Object currentValue = _queue.getAttribute(Queue.EXCLUSIVE); + if(currentValue == null || currentValue == ExclusivityPolicy.NONE) + { + _queue.setAttribute(Queue.EXCLUSIVE, currentValue, ExclusivityPolicy.CONTAINER); + } + } + else + { + Object currentValue = _queue.getAttribute(Queue.EXCLUSIVE); + if(currentValue != null && currentValue != ExclusivityPolicy.NONE) + { + _queue.setAttribute(Queue.EXCLUSIVE, currentValue, ExclusivityPolicy.NONE); + } + } + } public void setAlternateExchange(String exchangeName) throws OperationsException diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java index 2c88f83405..c39c3f74e9 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -181,8 +181,13 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean createArgs = processNewQueueArguments(queueName, owner, originalArguments); - getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, - QueueArgumentsConverter.convertWireArgsToModel(createArgs)); + + final Map attributes = QueueArgumentsConverter.convertWireArgsToModel(createArgs); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.DURABLE, durable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + + getConfiguredObject().createQueue(attributes); } diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index f94c206512..2874168ddf 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Statistics; @@ -154,7 +155,7 @@ public class QueueMBeanTest extends QpidTestCase public void testIsAutoDelete() throws Exception { - when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.AUTO_DELETE); + when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); assertTrue(_queueMBean.isAutoDelete()); } @@ -224,22 +225,31 @@ public class QueueMBeanTest extends QpidTestCase testSetAttribute("flowResumeCapacity", Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,1048576l , 2097152l); } + + /********** Other attributes **********/ + + public void testIsExclusive() throws Exception { - assertAttribute("exclusive", Boolean.TRUE, Queue.EXCLUSIVE); + when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.CONTAINER); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "exclusive", true); } public void testIsNotExclusive() throws Exception { - assertAttribute("exclusive", Boolean.FALSE, Queue.EXCLUSIVE); + when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "exclusive", false); } public void testSetExclusive() throws Exception { - testSetAttribute("exclusive", Queue.EXCLUSIVE, Boolean.FALSE , Boolean.TRUE); - } + when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE); - /********** Other attributes **********/ + MBeanTestUtils.setMBeanAttribute(_queueMBean, "exclusive", Boolean.TRUE); + + verify(_mockQueue).setAttribute(Queue.EXCLUSIVE, ExclusivityPolicy.NONE, ExclusivityPolicy.CONTAINER); + + } public void testGetAlternateExchange() { diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java index 4240dd5280..2dc2cb8d3b 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.mockito.ArgumentCaptor; public class VirtualHostManagerMBeanTest extends TestCase { @@ -68,8 +69,15 @@ public class VirtualHostManagerMBeanTest extends TestCase public void testCreateQueueWithNoOwner() throws Exception { _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, null, true); + ArgumentCaptor argsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(_mockVirtualHost).createQueue(argsCaptor.capture()); + + Map actualAttributes = argsCaptor.getValue(); + assertEquals(TEST_QUEUE_NAME, actualAttributes.get(Queue.NAME)); + assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE)); + assertEquals(null,actualAttributes.get(Queue.OWNER)); - verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, EMPTY_ARGUMENT_MAP); } /** @@ -79,9 +87,15 @@ public class VirtualHostManagerMBeanTest extends TestCase public void testCreateQueueWithOwnerMappedThroughToDescription() throws Exception { _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true); + ArgumentCaptor argsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(_mockVirtualHost).createQueue(argsCaptor.capture()); - Map expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER); - verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); + Map actualAttributes = argsCaptor.getValue(); + assertEquals(TEST_QUEUE_NAME,actualAttributes.get(Queue.NAME)); + assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE)); + assertEquals(null,actualAttributes.get(Queue.OWNER)); + assertEquals(TEST_OWNER, actualAttributes.get(Queue.DESCRIPTION)); } public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception @@ -89,8 +103,15 @@ public class VirtualHostManagerMBeanTest extends TestCase Map arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments); - Map expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION); - verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); + ArgumentCaptor argsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(_mockVirtualHost).createQueue(argsCaptor.capture()); + + Map actualAttributes = argsCaptor.getValue(); + assertEquals(TEST_QUEUE_NAME,actualAttributes.get(Queue.NAME)); + assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE)); + assertEquals(null,actualAttributes.get(Queue.OWNER)); + assertEquals(TEST_DESCRIPTION, actualAttributes.get(Queue.DESCRIPTION)); } public void testDeleteQueue() throws Exception diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java index 41d93b5ca2..c6b2c9e95c 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java @@ -290,7 +290,7 @@ public class ModelTest extends QpidBrokerTestCase session.createQueue(new AMQShortString(queueName), autoDelete, durable, exclusive); - validateQueueViaJMX(queueName, exclusive ? connection.getClientID() : null, durable, autoDelete); + validateQueueViaJMX(queueName, (exclusive && durable &&!isBroker010()) ? connection.getClientID() : null, durable, autoDelete || (exclusive && !isBroker010() && !durable)); } /** diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index bdcdbe23c2..709d8ae34a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.store; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; + import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; @@ -41,6 +41,9 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; @@ -51,7 +54,6 @@ import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.ConflationQueue; import org.apache.qpid.server.queue.StandardQueue; -import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -539,11 +541,10 @@ public class MessageStoreTest extends QpidTestCase } } - private void setQueueExclusivity(boolean exclusive) throws AMQException + private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive { AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); - - queue.setExclusive(exclusive); + queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); } private void validateQueueExclusivityProperty(boolean expected) @@ -587,7 +588,7 @@ public class MessageStoreTest extends QpidTestCase assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass()); } - assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner()); + assertEquals("Queue owner is not as expected", exclusive ? queueOwner : null, queue.getOwner()); assertEquals("Queue durability is not as expected", durable, queue.isDurable()); assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive()); } @@ -671,7 +672,7 @@ public class MessageStoreTest extends QpidTestCase throws Exception { - Map queueArguments = null; + Map queueArguments = new HashMap(); if(usePriority || lastValueQueue) { @@ -680,19 +681,27 @@ public class MessageStoreTest extends QpidTestCase if (usePriority) { - queueArguments = Collections.singletonMap(Queue.PRIORITIES, (Object) DEFAULT_PRIORTY_LEVEL); + queueArguments.put(Queue.PRIORITIES, DEFAULT_PRIORTY_LEVEL); } if (lastValueQueue) { - queueArguments = Collections.singletonMap(Queue.LVQ_KEY, (Object) LVQ_KEY); + queueArguments.put(Queue.LVQ_KEY, LVQ_KEY); } + queueArguments.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + queueArguments.put(Queue.NAME, queueName); + queueArguments.put(Queue.DURABLE, durable); + queueArguments.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + queueArguments.put(Queue.EXCLUSIVE, exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); + if(exclusive && queueOwner != null) + { + queueArguments.put(Queue.OWNER, queueOwner); + } AMQQueue queue = null; //Ideally we would be able to use the QueueDeclareHandler here. - queue = getVirtualHost().createQueue(UUIDGenerator.generateRandomUUID(), queueName, durable, queueOwner, false, exclusive, - false, queueArguments); + queue = getVirtualHost().createQueue(null, queueArguments); validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 3c15a45203..931974942f 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -130,9 +130,10 @@ public class QueueManagementTest extends QpidBrokerTestCase public void testExclusiveQueueHasJmsClientIdAsOwner() throws Exception { - Queue tmpQueue = _session.createTemporaryQueue(); + final String subName = "testOwner"; + _session.createDurableSubscriber(getTestTopic(), subName); - final String queueName = tmpQueue.getQueueName(); + final String queueName = _connection.getClientID() + ":" + subName; final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertNotNull(_connection.getClientID()); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index 9028130c18..1b27ee74f6 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -31,16 +31,7 @@ import java.util.Map; import javax.jms.JMSException; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Connection; -import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Protocol; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.*; import org.apache.qpid.test.utils.TestBrokerConfiguration; public class Asserts @@ -100,7 +91,8 @@ public class Asserts assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_TYPE, queueType, queueData.get(Queue.QUEUE_TYPE)); if (expectedAttributes == null) { - assertEquals("Unexpected value of queue attribute " + Queue.EXCLUSIVE, Boolean.FALSE, queueData.get(Queue.EXCLUSIVE)); + assertEquals("Unexpected value of queue attribute " + Queue.EXCLUSIVE, + ExclusivityPolicy.NONE.name(), queueData.get(Queue.EXCLUSIVE)); assertEquals("Unexpected value of queue attribute " + Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0, queueData.get(Queue.MAXIMUM_DELIVERY_ATTEMPTS)); assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 0, diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java index fec516bc2b..3833af3ca5 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java @@ -145,7 +145,6 @@ public class QueueRestTest extends QpidRestTestCase attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 40000); attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 50000); attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); - attributes.put(Queue.EXCLUSIVE, true); responseCode = getRestTestHelper().submitRequest("/rest/queue/test/" + queueName, "PUT", attributes); assertEquals("Setting of queue attribites should be allowed", 200, responseCode); @@ -158,7 +157,6 @@ public class QueueRestTest extends QpidRestTestCase assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 30000, queueData.get(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) ); assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 40000, queueData.get(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) ); assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 50000, queueData.get(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) ); - assertEquals("Unexpected " + Queue.EXCLUSIVE, true, queueData.get(Queue.EXCLUSIVE) ); } public void testPutCreateBinding() throws Exception @@ -217,7 +215,7 @@ public class QueueRestTest extends QpidRestTestCase assertEquals("Unexpected binding attribute " + Consumer.NAME, "1", consumer.get(Consumer.NAME)); assertEquals("Unexpected binding attribute " + Consumer.DURABLE, Boolean.FALSE, consumer.get(Consumer.DURABLE)); - assertEquals("Unexpected binding attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.name(), + assertEquals("Unexpected binding attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END.name(), consumer.get(Consumer.LIFETIME_POLICY)); assertEquals("Unexpected binding attribute " + Consumer.DISTRIBUTION_MODE, "MOVE", consumer.get(Consumer.DISTRIBUTION_MODE)); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 96ca0c2def..d7b17fda8a 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -36,7 +36,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.ConflationQueue; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; @@ -291,7 +291,7 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName , "lvq", lvqQueue); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE)); - assertEquals("Unexpected lvq key attribute", AMQQueueFactory.QPID_DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY)); + assertEquals("Unexpected lvq key attribute", ConflationQueue.DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY)); } public void testPutCreateSortedQueueWithoutKey() throws Exception diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index 6356b17e6f..6473a77855 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -321,14 +321,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase MessageConsumer consumer; if(durableSub) { - if (isBroker010()) - { - consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); - } - else - { - consumer = clientSession.createDurableSubscriber(clientSession.createTopic(destName), getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); - } + consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); } else { diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes index 67c2fcd5ad..cbdb0c1172 100755 --- a/java/test-profiles/Java010Excludes +++ b/java/test-profiles/Java010Excludes @@ -35,6 +35,9 @@ org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted org.apache.qpid.server.logging.ConsumerLoggingTest#testSubscriptionSuspend org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnQueueArgumentsMismatch +// 0-10 exclusive queue is session exclusive and not container exclusive (so exclusive owner is not client-id) +org.apache.qpid.server.logging.DurableQueueLoggingTest#testQueueCreateDurableExclusive +org.apache.qpid.server.logging.TransientQueueLoggingTest#testQueueCreateDurableExclusive // 0-10 is not supported by the MethodRegistry org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* @@ -69,3 +72,6 @@ org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessag // Java 0-10 client does not support re-binding the queue to the same exchange org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange + +// Java 0-10 exclusive queues are owned by sessions not by the client-id, so the owner is not meaningful from JMX +org.apache.qpid.systest.management.jmx.QueueManagementTest#testExclusiveQueueHasJmsClientIdAsOwner diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes index 7f9a20e245..ff8bcfca68 100644 --- a/java/test-profiles/JavaPre010Excludes +++ b/java/test-profiles/JavaPre010Excludes @@ -78,3 +78,8 @@ org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchange // QPID-3396 org.apache.qpid.test.unit.client.connection.ConnectionTest#testExceptionWhenUserPassIsRequired + +// Non durable exclusive queues are exclusive to a connection (and thus auto-delete - or at least +// not permanent), also their owner is not the container (client-id) but the connection +org.apache.qpid.server.logging.TransientQueueLoggingTest#testQueueCreateDurableExclusive + -- cgit v1.2.1