summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
commitc4794154e1c058d6761ef59f525deb68b42ef91f (patch)
treebe8fdcd5918be68b36cc983b40b8ad10d93244dd /java
parent9d4a19f07a6d54fbc0dbc59e750992f6abe45a40 (diff)
downloadqpid-python-c4794154e1c058d6761ef59f525deb68b42ef91f.tar.gz
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java3
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java8
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java5
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java10
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java252
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java11
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java5
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java9
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java179
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java10
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java4
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java16
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java24
-rw-r--r--java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java10
-rw-r--r--java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Queue.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java59
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java190
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java154
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java38
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java46
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java105
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java41
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java31
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java23
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java40
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java18
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java31
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java316
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java28
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java94
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java93
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java45
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java162
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java23
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java12
60 files changed, 1419 insertions, 883 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 2350e28ee2..a4383d94c4 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1403,6 +1403,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
if (_stateManager.isInState(State.ACTIVE))
{
+ LOGGER.debug("Storing configured object: " + configuredObject);
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
keyBinding.objectToEntry(configuredObject.getId(), key);
@@ -1430,6 +1431,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws AMQStoreException
{
+
+ LOGGER.debug("Removing configured object: " + id);
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
uuidBinding.objectToEntry(id, key);
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
index 464608ff59..5d84dc2c91 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.util.MapJsonSerializer;
@@ -91,6 +92,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
ExchangeDefaults.HEADERS_EXCHANGE_NAME.asString(), ExchangeDefaults.TOPIC_EXCHANGE_NAME.asString(),
ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString() };
private static final Set<String> DEFAULT_EXCHANGES_SET = new HashSet<String>(Arrays.asList(DEFAULT_EXCHANGES));
+ private static final String ARGUMENTS = "arguments";
private MapJsonSerializer _serializer = new MapJsonSerializer();
@@ -580,10 +582,10 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
if (moveNonExclusiveOwnerToDescription(owner, exclusive))
{
- _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION);
+ _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + QueueArgumentsConverter.X_QPID_DESCRIPTION);
attributesMap.put(Queue.OWNER, null);
- argumentsCopy.put(AMQShortString.valueOf(AMQQueueFactory.X_QPID_DESCRIPTION), owner);
+ argumentsCopy.put(AMQShortString.valueOf(QueueArgumentsConverter.X_QPID_DESCRIPTION), owner);
}
else
{
@@ -591,7 +593,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
}
if (!argumentsCopy.isEmpty())
{
- attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(argumentsCopy));
+ attributesMap.put(ARGUMENTS, FieldTable.convertToMap(argumentsCopy));
}
return attributesMap;
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
index 390d667db0..63af8d3840 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
@@ -48,6 +48,7 @@ import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.url.URLSyntaxException;
/**
@@ -159,7 +160,7 @@ public class BDBStoreUpgradeTestPreparer
session = connection.createSession(true, Session.SESSION_TRANSACTED);
// Create a priority queue on broker
final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>();
- priorityQueueArguments.put("x-qpid-priorities",10);
+ priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10);
createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
// Create a queue that has a DLQ
@@ -342,4 +343,4 @@ public class BDBStoreUpgradeTestPreparer
BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
producer.prepareBroker();
}
-} \ No newline at end of file
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index c33d427868..44f0861275 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
@@ -76,6 +77,7 @@ import com.sleepycat.je.Transaction;
public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
{
private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6Test.class);
+ private static final String ARGUMENTS = "arguments";
@Override
protected String getStoreDirectoryName()
@@ -287,12 +289,12 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null));
final Map<String, Object> queueWithOwnerArguments = new HashMap<String, Object>();
- queueWithOwnerArguments.put("x-qpid-priorities", 10);
- queueWithOwnerArguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description");
+ queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10);
+ queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, "misused-owner-as-description");
expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,queueWithOwnerArguments));
final Map<String, Object> priorityQueueArguments = new HashMap<String, Object>();
- priorityQueueArguments.put("x-qpid-priorities", 10);
+ priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10);
expected.add(createExpectedQueueMap(PRIORITY_QUEUE_NAME, Boolean.FALSE, null, priorityQueueArguments));
final Map<String, Object> queueWithDLQArguments = new HashMap<String, Object>();
@@ -388,7 +390,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
expectedQueueEntry.put(Queue.OWNER, owner);
if (argumentMap != null)
{
- expectedQueueEntry.put(Queue.ARGUMENTS, argumentMap);
+ expectedQueueEntry.put(ARGUMENTS, argumentMap);
}
return expectedQueueEntry;
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 8e79813216..60211823f8 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -20,24 +20,26 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.LinkedHashMap;
+import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
@@ -61,6 +63,7 @@ import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
import org.apache.qpid.transport.*;
import java.nio.ByteBuffer;
@@ -72,11 +75,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
- /**
- * No-local queue argument is used to support the no-local feature of Durable Subscribers.
- */
- private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local";
-
public ServerSessionDelegate()
{
@@ -195,10 +193,9 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
String queueName = method.getQueue();
- QueueRegistry queueRegistry = getQueueRegistry(session);
+ VirtualHost vhost = getVirtualHost(session);
-
- final AMQQueue queue = queueRegistry.getQueue(queueName);
+ final AMQQueue queue = vhost.getQueue(queueName);
if(queue == null)
{
@@ -929,7 +926,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
VirtualHost virtualHost = getVirtualHost(session);
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
{
@@ -947,7 +943,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
method.setBindingKey(method.getQueue());
}
- AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ AMQQueue queue = virtualHost.getQueue(method.getQueue());
Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
@@ -991,7 +987,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
VirtualHost virtualHost = getVirtualHost(session);
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
{
@@ -1007,7 +1002,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ AMQQueue queue = virtualHost.getQueue(method.getQueue());
Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
@@ -1174,158 +1169,137 @@ public class ServerSessionDelegate extends SessionDelegate
private AMQQueue getQueue(Session session, String queue)
{
- QueueRegistry queueRegistry = getQueueRegistry(session);
- return queueRegistry.getQueue(queue);
- }
-
- private QueueRegistry getQueueRegistry(Session session)
- {
- return getVirtualHost(session).getQueueRegistry();
+ return getVirtualHost(session).getQueue(queue);
}
@Override
public void queueDeclare(Session session, final QueueDeclare method)
{
- VirtualHost virtualHost = getVirtualHost(session);
+ final VirtualHost virtualHost = getVirtualHost(session);
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
String queueName = method.getQueue();
AMQQueue queue;
- QueueRegistry queueRegistry = getQueueRegistry(session);
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
- synchronized (queueRegistry)
+ final boolean exclusive = method.getExclusive();
+ final boolean autoDelete = method.getAutoDelete();
+
+ if(method.getPassive())
{
+ queue = virtualHost.getQueue(queueName);
- if (((queue = queueRegistry.getQueue(queueName)) == null))
+ if (queue == null)
{
+ String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND;
- if (method.getPassive())
- {
- String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
- ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND;
+ exception(session, method, errorCode, description);
- exception(session, method, errorCode, description);
+ }
+ else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ {
+ String description = "Cannot declare queue('" + queueName + "'),"
+ + " as exclusive queue with same name "
+ + "declared on another session";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
- return;
- }
- else
+ exception(session, method, errorCode, description);
+
+ }
+ }
+ else
+ {
+
+ try
+ {
+
+ String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
+ final String alternateExchangeName = method.getAlternateExchange();
+
+
+ final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(method.getArguments());
+
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{
- try
- {
- queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
- if(!method.getExclusive() && method.getAutoDelete())
- {
- queue.setDeleteOnNoConsumers(true);
- }
+ arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName);
+ }
- final String alternateExchangeName = method.getAlternateExchange();
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
- {
- Exchange alternate = getExchange(session, alternateExchangeName);
- queue.setAlternateExchange(alternate);
- }
+ final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
- if(method.hasArguments() && method.getArguments() != null)
- {
- if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL))
- {
- Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL);
- queue.setNoLocal(convertBooleanValue(noLocal));
- }
- }
+ final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+ queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
+ autoDelete, exclusive, deleteOnNoConsumer,
+ arguments);
- if (queue.isDurable() && !queue.isAutoDelete())
+ if (autoDelete && exclusive)
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task deleteQueueTask = new ServerSession.Task()
{
- if(method.hasArguments() && method.getArguments() != null)
+ public void doTask(ServerSession session)
{
- Map<String,Object> args = method.getArguments();
- FieldTable ftArgs = new FieldTable();
- for(Map.Entry<String, Object> entry : args.entrySet())
+ try
+ {
+ virtualHost.removeQueue(q);
+ }
+ catch (AMQException e)
{
- ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
+ exception(session, method, e, "Cannot delete '" + method.getQueue());
}
- DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs);
}
- else
+ };
+ final ServerSession s = (ServerSession) session;
+ s.addSessionCloseTask(deleteQueueTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue) throws AMQException
{
- DurableConfigurationStoreHelper.createQueue(store, queue, null);
+ s.removeSessionCloseTask(deleteQueueTask);
}
- }
- queueRegistry.registerQueue(queue);
-
- if (method.hasAutoDelete()
- && method.getAutoDelete()
- && method.hasExclusive()
- && method.getExclusive())
+ });
+ }
+ if (exclusive)
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task removeExclusive = new ServerSession.Task()
+ {
+ public void doTask(ServerSession session)
{
- final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- try
- {
- q.delete();
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete '" + method.getQueue());
- }
- }
- };
- final ServerSession s = (ServerSession) session;
- s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue) throws AMQException
- {
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ q.setAuthorizationHolder(null);
+ q.setExclusiveOwningSession(null);
}
- if (method.hasExclusive()
- && method.getExclusive())
+ };
+ final ServerSession s = (ServerSession) session;
+ q.setExclusiveOwningSession(s);
+ s.addSessionCloseTask(removeExclusive);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue) throws AMQException
{
- final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- q.setAuthorizationHolder(null);
- q.setExclusiveOwningSession(null);
- }
- };
- final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
- s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue) throws AMQException
- {
- s.removeSessionCloseTask(removeExclusive);
- }
- });
+ s.removeSessionCloseTask(removeExclusive);
}
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare queue '" + queueName);
- }
+ });
}
}
- else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ catch(QueueExistsException qe)
{
+ queue = qe.getExistingQueue();
+ if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ {
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
exception(session, method, errorCode, description);
-
- return;
+ }
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot declare queue '" + queueName);
}
}
}
@@ -1354,20 +1328,6 @@ public class ServerSessionDelegate extends SessionDelegate
return false;
}
- protected AMQQueue createQueue(final String queueName,
- final QueueDeclare body,
- VirtualHost virtualHost,
- final ServerSession session)
- throws AMQException
- {
- String owner = body.getExclusive() ? session.getClientID() : null;
-
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner,
- body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
-
- return queue;
- }
-
@Override
public void queueDelete(Session session, QueueDelete method)
{
@@ -1412,12 +1372,7 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
- queue.delete();
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- DurableConfigurationStoreHelper.removeQueue(store,queue);
- }
+ virtualHost.removeQueue(queue);
}
catch (AMQException e)
{
@@ -1471,7 +1426,14 @@ public class ServerSessionDelegate extends SessionDelegate
result.setDurable(queue.isDurable());
result.setExclusive(queue.isExclusive());
result.setAutoDelete(queue.isAutoDelete());
- result.setArguments(queue.getArguments());
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>();
+ Collection<String> availableAttrs = queue.getAvailableAttributes();
+
+ for(String attrName : availableAttrs)
+ {
+ arguments.put(attrName, queue.getAttribute(attrName));
+ }
+ result.setArguments(QueueArgumentsConverter.convertModelArgsToWire(arguments));
result.setMessageCount(queue.getMessageCount());
result.setSubscriberCount(queue.getConsumerCount());
@@ -1491,7 +1453,7 @@ public class ServerSessionDelegate extends SessionDelegate
if(sub == null)
{
- exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '" + destination + "'");
}
else if(sub.isStopped())
{
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index c6bceb6ac7..63582702cb 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -21,9 +21,6 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -33,6 +30,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.message.InboundMessage;
@@ -40,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -65,7 +64,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -169,9 +167,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
_queue = queue;
- Map<String, Object> arguments = queue.getArguments();
- _traceExclude = (String) arguments.get("qpid.trace.exclude");
- _trace = (String) arguments.get("qpid.trace.id");
+ _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
+ _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
String filterLogString = null;
_logActor = GenericActor.getInstance(this);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index 6577efe292..4e620327c9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -73,7 +73,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
" args:" + body.getArguments());
}
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern());
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
if (queue == null)
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 8883422989..5238a41e49 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -74,7 +74,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
else
{
channel.sync();
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString());
if (queue == null)
{
_log.info("No queue for '" + body.getQueue() + "'");
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
index 85f0a6fd3d..ba5692fc6c 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
@@ -70,7 +70,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MethodRegistry methodRegistry = session.getMethodRegistry();
final AMQChannel channel = session.getChannel(channelId);
@@ -115,7 +114,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
else
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
if (queue == null)
{
@@ -141,7 +140,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
}
else if (queueName != null)
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
if (queue == null)
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index a8e4e38422..359bd2eb19 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -62,7 +62,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
@@ -73,7 +72,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
final AMQQueue queue;
final AMQShortString routingKey;
- if (body.getQueue() == null)
+ final AMQShortString queueName = body.getQueue();
+
+ if (queueName == null)
{
queue = channel.getDefaultQueue();
@@ -94,13 +95,13 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(queueName.toString());
routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
}
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
}
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
final Exchange exch = virtualHost.getExchange(exchangeName);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 9f887d881d..fd547d4bac 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
@@ -44,6 +45,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -61,8 +63,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
final AMQSessionModel session = protocolConnection.getChannel(channelId);
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
final AMQShortString queueName;
@@ -87,97 +87,103 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
throw body.getChannelNotFoundException(channelId);
}
- synchronized (queueRegistry)
+ if(body.getPassive())
{
- queue = queueRegistry.getQueue(queueName);
-
- AMQSessionModel owningSession = null;
-
- if (queue != null)
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
{
- owningSession = queue.getExclusiveOwningSession();
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
-
- if (queue == null)
+ else
{
- if (body.getPassive())
+ AMQSessionModel owningSession = queue.getExclusiveOwningSession();
+ if (queue.isExclusive() && !queue.isDurable()
+ && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
{
- String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
- throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
}
- else
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+ }
+ else
+ {
+
+ try
+ {
+
+ queue = createQueue(queueName, body, virtualHost, protocolConnection);
+ queue.setAuthorizationHolder(protocolConnection);
+
+ if (body.getExclusive())
{
- queue = createQueue(queueName, body, virtualHost, protocolConnection);
+ queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
queue.setAuthorizationHolder(protocolConnection);
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStoreHelper.createQueue(store, queue, body.getArguments());
- }
- if(body.getAutoDelete())
- {
- queue.setDeleteOnNoConsumers(true);
- }
- queueRegistry.registerQueue(queue);
- if (body.getExclusive())
- {
- queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
- queue.setAuthorizationHolder(protocolConnection);
- if(!body.getDurable())
+ if(!body.getDurable())
+ {
+ final AMQQueue q = queue;
+ final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
- final AMQQueue q = queue;
- final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+ public void doTask(AMQProtocolSession session) throws AMQException
{
- public void doTask(AMQProtocolSession session) throws AMQException
- {
- q.setExclusiveOwningSession(null);
- }
- };
- protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
- {
- protocolConnection.removeSessionCloseTask(sessionCloseTask);
- }
- });
- }
+ q.setExclusiveOwningSession(null);
+ }
+ };
+ protocolConnection.addSessionCloseTask(sessionCloseTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task() {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ protocolConnection.removeSessionCloseTask(sessionCloseTask);
+ }
+ });
}
}
- }
- else if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
- }
- else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive()))
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
- + queue.isExclusive() + " requested " + body.getExclusive() + ")");
}
- else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
+ catch(QueueExistsException qe)
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
- + "as exclusive queue with same name "
- + "declared on another client ID('"
- + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
- }
- else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: "
- + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
- }
- else if(!body.getPassive() && queue.isDurable() != body.getDurable())
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: "
- + queue.isDurable() + " requested " + body.getDurable() + ")");
- }
+ queue = qe.getExistingQueue();
+ AMQSessionModel owningSession = queue.getExclusiveOwningSession();
+
+ if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
+ else if(queue.isExclusive() != body.getExclusive())
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
+ + queue.isExclusive() + " requested " + body.getExclusive() + ")");
+ }
+ else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
+ + "as exclusive queue with same name "
+ + "declared on another client ID('"
+ + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
+ }
+ else if(queue.isAutoDelete() != body.getAutoDelete())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: "
+ + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ }
+ else if(queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: "
+ + queue.isDurable() + " requested " + body.getDurable() + ")");
+ }
+ }
//set this as the default queue on the channel:
channel.setDefaultQueue(queue);
@@ -204,30 +210,35 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
protected AMQQueue createQueue(final AMQShortString queueName,
QueueDeclareBody body,
- VirtualHost virtualHost,
+ final VirtualHost virtualHost,
final AMQProtocolSession session)
throws AMQException
{
- final QueueRegistry registry = virtualHost.getQueueRegistry();
- String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null;
- Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments());
+ final boolean durable = body.getDurable();
+ final boolean autoDelete = body.getAutoDelete();
+ final boolean exclusive = body.getExclusive();
+
+ String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null;
+
+ Map<String, Object> arguments =
+ QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
String queueNameString = AMQShortString.toString(queueName);
+ final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName());
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()),
- queueNameString, body.getDurable(), owner, body.getAutoDelete(),
- body.getExclusive(),virtualHost, arguments);
+ final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete,
+ exclusive, autoDelete, arguments);
- if (body.getExclusive() && !body.getDurable())
+ if (exclusive && !durable)
{
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
{
public void doTask(AMQProtocolSession session) throws AMQException
{
- if (registry.getQueue(queueName) == queue)
+ if (virtualHost.getQueue(queueName.toString()) == queue)
{
- queue.delete();
+ virtualHost.removeQueue(queue);
}
}
};
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
index 6f5e0ea992..a39faf2e70 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
@@ -62,7 +62,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
@@ -82,7 +81,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(body.getQueue().toString());
}
if (queue == null)
@@ -112,12 +111,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
"Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
}
- int purged = queue.delete();
-
- if (queue.isDurable())
- {
- DurableConfigurationStoreHelper.removeQueue(store, queue);
- }
+ int purged = virtualHost.removeQueue(queue);
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
index e925eb7455..ff845d3c16 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
@@ -60,7 +60,6 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
@@ -84,7 +83,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(body.getQueue().toString());
}
if(queue == null)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
index aad5446cb5..20405b82ab 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
@@ -59,8 +59,6 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
final AMQQueue queue;
final AMQShortString routingKey;
@@ -87,7 +85,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(body.getQueue().toString());
routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index ca67b6f79b..35f24afbce 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -110,10 +110,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(destination instanceof QueueDestination)
{
queue = ((QueueDestination) _destination).getQueue();
- if(queue.getArguments() != null && queue.getArguments().containsKey("topic"))
+
+ if(queue.getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
+
qd = (QueueDestination) destination;
Map<Symbol,Filter> filters = source.getFilter();
@@ -194,19 +196,19 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
name = UUID.randomUUID().toString();
}
- queue = _vhost.getQueueRegistry().getQueue(name);
+ queue = _vhost.getQueue(name);
Exchange exchange = exchangeDestination.getExchange();
if(queue == null)
{
- queue = AMQQueueFactory.createAMQQueueImpl(
+ queue = _vhost.createQueue(
UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
name,
isDurable,
null,
true,
true,
- _vhost,
+ true,
Collections.EMPTY_MAP);
}
else
@@ -309,11 +311,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
public void doTask(Connection_1_0 session)
{
- if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
+ if (_vhost.getQueue(queueName) == tempQueue)
{
try
{
- tempQueue.delete();
+ _vhost.removeQueue(tempQueue);
}
catch (AMQException e)
{
@@ -417,7 +419,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
try
{
- queue.delete();
+ queue.getVirtualHost().removeQueue(queue);
}
catch(AMQException e)
{
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ed75a8c165..d3962c779c 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -107,7 +107,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ AMQQueue queue = _vhost.getQueue(addr);
if(queue != null)
{
@@ -256,7 +256,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
else
{
- AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ AMQQueue queue = _vhost.getQueue(addr);
if(queue != null)
{
@@ -329,14 +329,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
- final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
- queueName,
- false, // durable
- null, // owner
- false, // autodelete
- false, // exclusive
- _vhost,
- properties);
+ final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
+ queueName,
+ false, // durable
+ null, // owner
+ false, // autodelete
+ false, // exclusive
+ false,
+ properties);
@@ -347,11 +347,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
{
public void doTask(Connection_1_0 session)
{
- if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
+ if (_vhost.getQueue(queueName) == tempQueue)
{
try
{
- tempQueue.delete();
+ _vhost.removeQueue(tempQueue);
}
catch (AMQException e)
{
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
index 67ac1bdc7c..2c88f83405 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
@MBeanDescription("This MBean exposes the broker level management features")
public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<VirtualHost> implements ManagedBroker
@@ -180,7 +181,8 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi
throws IOException, JMException
{
final Map<String, Object> createArgs = processNewQueueArguments(queueName, owner, originalArguments);
- getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, createArgs);
+ getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l,
+ QueueArgumentsConverter.convertWireArgsToModel(createArgs));
}
@@ -196,11 +198,11 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi
if (_moveNonExclusiveQueueOwnerToDescription && owner != null)
{
argumentsCopy = new HashMap<String, Object>(arguments == null ? new HashMap<String, Object>() : arguments);
- if (!argumentsCopy.containsKey(AMQQueueFactory.X_QPID_DESCRIPTION))
+ if (!argumentsCopy.containsKey(QueueArgumentsConverter.X_QPID_DESCRIPTION))
{
- LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION);
+ LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + QueueArgumentsConverter.X_QPID_DESCRIPTION);
- argumentsCopy.put(AMQQueueFactory.X_QPID_DESCRIPTION, owner);
+ argumentsCopy.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, owner);
}
else
{
diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
index e3fac9f711..4240dd5280 100644
--- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
+++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
public class VirtualHostManagerMBeanTest extends TestCase
{
@@ -79,16 +80,16 @@ public class VirtualHostManagerMBeanTest extends TestCase
{
_virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true);
- Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_OWNER);
+ Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER);
verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments);
}
public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception
{
- Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION);
+ Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION);
_virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments);
- Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION);
+ Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION);
verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 53dd6df599..631490ab5f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -167,11 +167,6 @@ public abstract class AbstractExchange implements Exchange
return _virtualHost;
}
- public QueueRegistry getQueueRegistry()
- {
- return getVirtualHost().getQueueRegistry();
- }
-
public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue)
{
return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index 2873eb31e8..8e9f980e6b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -47,6 +47,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
{
+ private final QueueRegistry _queueRegistry;
private UUID _id;
private VirtualHost _virtualHost;
private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
@@ -55,6 +56,11 @@ public class DefaultExchange implements Exchange
private LogSubject _logSubject;
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
+ public DefaultExchange(QueueRegistry queueRegistry)
+ {
+ _queueRegistry = queueRegistry;
+ }
+
@Override
public void initialise(UUID id,
@@ -82,7 +88,7 @@ public class DefaultExchange implements Exchange
@Override
public long getBindingCount()
{
- return _virtualHost.getQueueRegistry().getQueues().size();
+ return _virtualHost.getQueues().size();
}
@Override
@@ -146,7 +152,7 @@ public class DefaultExchange implements Exchange
@Override
public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
{
- if(_virtualHost.getQueueRegistry().getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
+ if(_virtualHost.getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
{
return convertToBinding(queue);
}
@@ -207,7 +213,7 @@ public class DefaultExchange implements Exchange
@Override
public List<AMQQueue> route(InboundMessage message)
{
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(message.getRoutingKey());
+ AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
{
List<AMQQueue> noQueues = Collections.emptyList();
@@ -235,13 +241,13 @@ public class DefaultExchange implements Exchange
@Override
public boolean isBound(AMQShortString routingKey)
{
- return _virtualHost.getQueueRegistry().getQueue(routingKey) != null;
+ return _virtualHost.getQueue(routingKey == null ? null : routingKey.toString()) != null;
}
@Override
public boolean isBound(AMQQueue queue)
{
- return _virtualHost.getQueueRegistry().getQueue(queue.getName()) == queue;
+ return _virtualHost.getQueue(queue.getName()) == queue;
}
@Override
@@ -283,7 +289,7 @@ public class DefaultExchange implements Exchange
@Override
public boolean isBound(String bindingKey)
{
- return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;
+ return _virtualHost.getQueue(bindingKey) != null;
}
@Override
@@ -320,7 +326,7 @@ public class DefaultExchange implements Exchange
public Collection<Binding> getBindings()
{
List<Binding> bindings = new ArrayList<Binding>();
- for(AMQQueue q : _virtualHost.getQueueRegistry().getQueues())
+ for(AMQQueue q : _virtualHost.getQueues())
{
bindings.add(convertToBinding(q));
}
@@ -330,7 +336,7 @@ public class DefaultExchange implements Exchange
@Override
public void addBindingListener(BindingListener listener)
{
- _virtualHost.getQueueRegistry().addRegistryChangeListener(convertListener(listener));//To change body of implemented methods use File | Settings | File Templates.
+ _queueRegistry.addRegistryChangeListener(convertListener(listener));
}
private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 75c489c731..d8263a3c80 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -40,20 +41,23 @@ import java.util.concurrent.ConcurrentMap;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
-
/**
* Maps from exchange name to exchange instance
*/
private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
private Exchange _defaultExchange;
- private VirtualHost _host;
+
+ private final VirtualHost _host;
+ private final QueueRegistry _queueRegistry;
+
private final Collection<RegistryChangeListener> _listeners =
Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
- public DefaultExchangeRegistry(VirtualHost host)
+ public DefaultExchangeRegistry(VirtualHost host, QueueRegistry queueRegistry)
{
_host = host;
+ _queueRegistry = queueRegistry;
}
public void initialise(ExchangeFactory exchangeFactory) throws AMQException
@@ -61,7 +65,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
//create 'standard' exchanges:
new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore());
- _defaultExchange = new DefaultExchange();
+ _defaultExchange = new DefaultExchange(_queueRegistry);
UUID defaultExchangeId =
UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
index 6fe0607ab2..ae2031bd71 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -89,6 +89,7 @@ public interface Queue extends ConfiguredObject
public static final String EXCLUSIVE = "exclusive";
public static final String MESSAGE_GROUP_KEY = "messageGroupKey";
public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
+ public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
public static final String LVQ_KEY = "lvqKey";
public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
public static final String NO_LOCAL = "noLocal";
@@ -100,6 +101,10 @@ public interface Queue extends ConfiguredObject
public static final String TYPE = "type";
public static final String PRIORITIES = "priorities";
+ public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
+
+ public static final String FEDERATION_EXCLUDES = "federationExcludes";
+ public static final String FEDERATION_ID = "federationId";
public static final Collection<String> AVAILABLE_ATTRIBUTES =
@@ -134,6 +139,7 @@ public interface Queue extends ConfiguredObject
PRIORITIES
));
+
//children
Collection<Binding> getBindings();
Collection<Consumer> getConsumers();
@@ -144,6 +150,6 @@ public interface Queue extends ConfiguredObject
void visit(QueueEntryVisitor visitor);
void delete();
-
+
void setNotificationListener(QueueNotificationListener listener);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index a84a041b72..26ac99d5bd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -120,7 +120,7 @@ public interface VirtualHost extends ConfiguredObject
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
CONFIG_PATH));
- int CURRENT_CONFIG_VERSION = 2;
+ int CURRENT_CONFIG_VERSION = 3;
//children
Collection<VirtualHostAlias> getAliases();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 157b97cc07..96a7eacb92 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -66,25 +66,6 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
put(DESCRIPTION, String.class);
}});
- static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>();
- static
- {
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH);
-
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT);
-
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY);
-
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.PRIORITIES, AMQQueueFactory.X_QPID_PRIORITIES);
- }
-
private final AMQQueue _queue;
private final Map<Binding, BindingAdapter> _bindingAdapters =
new HashMap<Binding, BindingAdapter>();
@@ -190,15 +171,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
try
{
- QueueRegistry queueRegistry = _queue.getVirtualHost().getQueueRegistry();
- synchronized(queueRegistry)
- {
- _queue.delete();
- if (_queue.isDurable())
- {
- DurableConfigurationStoreHelper.removeQueue(_queue.getVirtualHost().getDurableConfigurationStore(), _queue);
- }
- }
+ _queue.getVirtualHost().removeQueue(_queue);
}
catch(AMQException e)
{
@@ -414,13 +387,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
- return _queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY);
+ return _queue.getAttribute(MESSAGE_GROUP_KEY);
}
else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
{
//We only return the boolean value if message groups are actually in use
- return getAttribute(MESSAGE_GROUP_KEY) == null ? null :
- SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(_queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP));
+ return getAttribute(MESSAGE_GROUP_KEY) == null ? null : _queue.getAttribute(MESSAGE_GROUP_SHARED_GROUPS);
}
else if(LVQ_KEY.equals(name))
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index c09dd9449e..977fd5ae56 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -41,12 +41,9 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
-import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -68,6 +65,7 @@ import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -86,6 +84,7 @@ import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHostListener;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener
{
@@ -203,7 +202,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
private void populateQueues()
{
- Collection<AMQQueue> actualQueues = _virtualHost.getQueueRegistry().getQueues();
+ Collection<AMQQueue> actualQueues = _virtualHost.getQueues();
if ( actualQueues != null )
{
synchronized(_queueAdapters)
@@ -399,7 +398,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null)
{
- attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LVQ_KEY);
+ attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
}
else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null)
{
@@ -415,7 +414,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
String key = MapValueConverter.getStringAttribute(Queue.MESSAGE_GROUP_KEY, attributes);
attributes.remove(Queue.MESSAGE_GROUP_KEY);
- attributes.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, key);
+ attributes.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, key);
}
if (attributes.containsKey(Queue.MESSAGE_GROUP_SHARED_GROUPS))
@@ -423,7 +422,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
if(MapValueConverter.getBooleanAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS, attributes))
{
attributes.remove(Queue.MESSAGE_GROUP_SHARED_GROUPS);
- attributes.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ attributes.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
}
}
@@ -440,15 +439,6 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
attributes.remove(Queue.LIFETIME_POLICY);
attributes.remove(Queue.TIME_TO_LIVE);
- List<String> attrNames = new ArrayList<String>(attributes.keySet());
- for(String attr : attrNames)
- {
- if(QueueAdapter.ATTRIBUTE_MAPPINGS.containsKey(attr))
- {
- attributes.put(QueueAdapter.ATTRIBUTE_MAPPINGS.get(attr),attributes.remove(attr));
- }
- }
-
return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes);
}
@@ -472,33 +462,26 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
owner = authenticatedPrincipal.getName();
}
}
+
+ final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE;
+
try
{
- QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- synchronized (queueRegistry)
- {
- if(_virtualHost.getQueueRegistry().getQueue(name)!=null)
- {
- throw new IllegalArgumentException("Queue with name "+name+" already exists");
- }
- AMQQueue queue =
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name,
- durable, owner, lifetime == LifetimePolicy.AUTO_DELETE,
- exclusive, _virtualHost, attributes);
- if(durable)
- {
- DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(),
- queue,
- FieldTable.convertToFieldTable(attributes));
- }
- synchronized (_queueAdapters)
- {
- return _queueAdapters.get(queue);
- }
+ AMQQueue queue =
+ _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name,
+ durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes);
+
+ synchronized (_queueAdapters)
+ {
+ return _queueAdapters.get(queue);
}
}
+ catch(QueueExistsException qe)
+ {
+ throw new IllegalArgumentException("Queue with name "+name+" already exists");
+ }
catch(AMQException e)
{
throw new IllegalArgumentException(e);
@@ -1057,7 +1040,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
if(VirtualHost.QUEUE_COUNT.equals(name))
{
- return _vhost.getQueueRegistry().getQueues().size();
+ return _vhost.getQueues().size();
}
else if(VirtualHost.EXCHANGE_COUNT.equals(name))
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4f610cc925..cb6a9249d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -225,7 +225,8 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
void setAlternateExchange(Exchange exchange);
- Map<String, Object> getArguments();
+ Collection<String> getAvailableAttributes();
+ Object getAttribute(String attrName);
void checkCapacity(AMQSessionModel channel);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 1eeb6dccf3..5001c2fd2b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -29,42 +29,31 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class AMQQueueFactory
+public class AMQQueueFactory implements QueueFactory
{
- public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
- public static final String X_QPID_CAPACITY = "x-qpid-capacity";
- public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
- public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
- public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
- public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
- public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
-
- public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
- public static final String X_QPID_DESCRIPTION = "x-qpid-description";
- public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
- public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
- public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
- public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+
- public static final String DLQ_ROUTING_KEY = "dlq";
- public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
- public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+ public static final String DLQ_ROUTING_KEY = "dlq";
+
+ private final VirtualHost _virtualHost;
+ private final QueueRegistry _queueRegistry;
- private AMQQueueFactory()
+ public AMQQueueFactory(VirtualHost virtualHost, QueueRegistry queueRegistry)
{
+ _virtualHost = virtualHost;
+ _queueRegistry = queueRegistry;
}
private abstract static class QueueProperty
@@ -129,56 +118,56 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumQueueDepth(value);
}
},
- new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
+ new QueueLongProperty(Queue.ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty(X_QPID_CAPACITY)
+ new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
+ new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setFlowResumeCapacity(value);
}
},
- new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+ new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS)
{
public void setPropertyValue(AMQQueue queue, int value)
{
@@ -189,13 +178,17 @@ public class AMQQueueFactory
/**
* @param id the id to use.
+ * @param deleteOnNoConsumer
*/
- public static AMQQueue createAMQQueueImpl(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
+ @Override
+ public AMQQueue createAMQQueueImpl(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQSecurityException, AMQException
{
if (id == null)
{
@@ -206,16 +199,11 @@ public class AMQQueueFactory
throw new IllegalArgumentException("Queue name must not be null");
}
- // Access check
- if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
- {
- String description = "Permission denied: queue-name '" + queueName + "'";
- throw new AMQSecurityException(description);
- }
- QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
- boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
- if (isDLQEnabled)
+ QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName);
+
+ boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration);
+ if (createDLQ)
{
validateDLNames(queueName);
}
@@ -226,17 +214,17 @@ public class AMQQueueFactory
if(arguments != null)
{
- if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
+ if(arguments.containsKey(Queue.LVQ_KEY))
{
- conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY);
+ conflationKey = (String) arguments.get(Queue.LVQ_KEY);
if(conflationKey == null)
{
- conflationKey = QPID_LVQ_KEY;
+ conflationKey = QPID_DEFAULT_LVQ_KEY;
}
}
- else if(arguments.containsKey(X_QPID_PRIORITIES))
+ else if(arguments.containsKey(Queue.PRIORITIES))
{
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+ Object prioritiesObj = arguments.get(Queue.PRIORITIES);
if(prioritiesObj instanceof Number)
{
priorities = ((Number)prioritiesObj).intValue();
@@ -257,33 +245,36 @@ public class AMQQueueFactory
// TODO - should warn here of invalid format
}
}
- else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+ else if(arguments.containsKey(Queue.SORT_KEY))
{
- sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+ sortingKey = (String)arguments.get(Queue.SORT_KEY);
}
}
AMQQueue q;
if(sortingKey != null)
{
- q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey);
}
else if(conflationKey != null)
{
- q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
+ q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
+ q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
}
else
{
- q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
+ q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
}
+ q.setDeleteOnNoConsumers(deleteOnNoConsumer);
+
//Register the new queue
- virtualHost.getQueueRegistry().registerQueue(q);
- q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
+ _queueRegistry.registerQueue(q);
+
+ q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName));
if(arguments != null)
{
@@ -294,21 +285,25 @@ public class AMQQueueFactory
p.setPropertyValue(q, arguments.get(p.getArgumentName().toString()));
}
}
+
+ if(arguments.get(Queue.NO_LOCAL) instanceof Boolean)
+ {
+ q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL));
+ }
+
}
- if(isDLQEnabled)
+ if(createDLQ)
{
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
- final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
Exchange dlExchange = null;
- final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName());
+ final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
try
{
- dlExchange = virtualHost.createExchange(dlExchangeId,
+ dlExchange = _virtualHost.createExchange(dlExchangeId,
dlExchangeName,
ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(),
true, false, null);
@@ -321,23 +316,19 @@ public class AMQQueueFactory
AMQQueue dlQueue = null;
- synchronized(queueRegistry)
+ synchronized(_queueRegistry)
{
- dlQueue = queueRegistry.getQueue(dlQueueName);
+ dlQueue = _queueRegistry.getQueue(dlQueueName);
if(dlQueue == null)
{
//set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
final Map<String, Object> args = new HashMap<String, Object>();
- args.put(X_QPID_DLQ_ENABLED, false);
- args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
-
- dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
+ args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
- //enter the dlq in the persistent store
- DurableConfigurationStoreHelper.createQueue(virtualHost.getDurableConfigurationStore(),
- dlQueue,
- FieldTable.convertToFieldTable(args));
+ dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive,
+ false, args);
}
}
@@ -350,11 +341,31 @@ public class AMQQueueFactory
}
q.setAlternateExchange(dlExchange);
}
+ else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
+ {
+
+ final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE);
+ Exchange altExchange;
+ try
+ {
+ altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
+ }
+ catch(IllegalArgumentException e)
+ {
+ altExchange = _virtualHost.getExchange(altExchangeAttr);
+ }
+ q.setAlternateExchange(altExchange);
+ }
+
+ if (q.isDurable() && !q.isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q);
+ }
return q;
}
- public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
+ public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws AMQException
{
String queueName = config.getName();
@@ -365,9 +376,9 @@ public class AMQQueueFactory
Map<String, Object> arguments = createQueueArgumentsFromConfig(config);
// we need queues that are defined in config to have deterministic ids.
- UUID id = UUIDGenerator.generateQueueUUID(queueName, host.getName());
+ UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName());
- AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
+ AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, false, arguments);
q.configure(config);
return q;
}
@@ -414,21 +425,23 @@ public class AMQQueueFactory
* queue configuration
* @return true if DLQ enabled
*/
- protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
{
//feature is not to be enabled for temporary queues or when explicitly disabled by argument
- if (!autoDelete)
+ if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
{
- boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+ boolean dlqArgumentPresent = arguments != null
+ && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION);
if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
{
boolean dlqEnabled = true;
if (dlqArgumentPresent)
{
- Object argument = arguments.get(X_QPID_DLQ_ENABLED);
- dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+ Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION);
+ dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
+ || (argument instanceof String && Boolean.parseBoolean(argument.toString()));
}
- return dlqEnabled;
+ return dlqEnabled ;
}
}
return false;
@@ -464,31 +477,30 @@ public class AMQQueueFactory
if(config.getArguments() != null && !config.getArguments().isEmpty())
{
- arguments.putAll(config.getArguments());
+ arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments())));
}
if(config.isLVQ() || config.getLVQKey() != null)
{
- arguments.put(QPID_LAST_VALUE_QUEUE, 1);
- arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+ arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey());
}
else if (config.getPriority() || config.getPriorities() > 0)
{
- arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
}
else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
{
- arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+ arguments.put(Queue.SORT_KEY, config.getQueueSortKey());
}
if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
{
- arguments.put(X_QPID_DLQ_ENABLED, true);
+ arguments.put(Queue.CREATE_DLQ_ON_CREATION, true);
}
if (config.getDescription() != null && !"".equals(config.getDescription()))
{
- arguments.put(X_QPID_DESCRIPTION, config.getDescription());
+ arguments.put(Queue.DESCRIPTION, config.getDescription());
}
if (arguments.isEmpty())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index 27a9e13617..7308433759 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -59,8 +59,9 @@ public class DefaultQueueRegistry implements QueueRegistry
}
}
- public void unregisterQueue(AMQShortString name)
+ public void unregisterQueue(String nameString)
{
+ AMQShortString name = new AMQShortString(nameString);
AMQQueue q = _queueMap.remove(name);
if(q != null)
{
@@ -74,16 +75,11 @@ public class DefaultQueueRegistry implements QueueRegistry
}
}
- public AMQQueue getQueue(AMQShortString name)
+ private AMQQueue getQueue(AMQShortString name)
{
return _queueMap.get(name);
}
- public Collection<AMQShortString> getQueueNames()
- {
- return _queueMap.keySet();
- }
-
public Collection<AMQQueue> getQueues()
{
return _queueMap.values();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
new file mode 100644
index 0000000000..f5bee850c2
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.qpid.server.model.Queue;
+
+public class QueueArgumentsConverter
+{
+ public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+ public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+ public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+ public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+ public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+ public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
+ public static final String QPID_ALERT_COUNT = "qpid.alert_count";
+ public static final String QPID_ALERT_SIZE = "qpid.alert_size";
+ public static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap";
+
+ public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
+
+ public static final String X_QPID_DESCRIPTION = "x-qpid-description";
+ /* public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
+ public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ */
+ public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+
+ public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+ public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+ public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
+ public static final String QPID_TRACE_ID = "qpid.trace.id";
+
+ public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+
+ /**
+ * No-local queue argument is used to support the no-local feature of Durable Subscribers.
+ */
+ public static final String QPID_NO_LOCAL = "no-local";
+ static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+ static
+ {
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_AGE, Queue.ALERT_THRESHOLD_MESSAGE_AGE);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_SIZE, Queue.ALERT_THRESHOLD_MESSAGE_SIZE);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_QUEUE_DEPTH, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
+ ATTRIBUTE_MAPPINGS.put(QPID_ALERT_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
+ ATTRIBUTE_MAPPINGS.put(QPID_ALERT_SIZE, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
+ ATTRIBUTE_MAPPINGS.put(QPID_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_DELIVERY_COUNT, Queue.MAXIMUM_DELIVERY_ATTEMPTS);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES);
+
+ ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, Queue.SORT_KEY);
+ ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, Queue.LVQ_KEY);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, Queue.PRIORITIES);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, Queue.CREATE_DLQ_ON_CREATION);
+ ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
+ //ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS);
+ ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+ ATTRIBUTE_MAPPINGS.put(QPID_TRACE_EXCLUDE, Queue.FEDERATION_EXCLUDES);
+ ATTRIBUTE_MAPPINGS.put(QPID_TRACE_ID, Queue.FEDERATION_ID);
+ ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
+
+ }
+
+
+ public static Map<String,Object> convertWireArgsToModel(Map<String,Object> wireArguments)
+ {
+ Map<String,Object> modelArguments = new HashMap<String, Object>();
+ if(wireArguments != null)
+ {
+ for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
+ {
+ if(wireArguments.containsKey(entry.getKey()))
+ {
+ modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey()));
+ }
+ }
+ if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
+ {
+ modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
+ }
+ if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
+ {
+ modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
+ SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(wireArguments.get(QPID_SHARED_MSG_GROUP)));
+ }
+ if(wireArguments.get(X_QPID_DLQ_ENABLED) != null)
+ {
+ modelArguments.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString()));
+ }
+
+ if(wireArguments.get(QPID_NO_LOCAL) != null)
+ {
+ modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
+ }
+
+ }
+ return modelArguments;
+ }
+
+
+ public static Map<String,Object> convertModelArgsToWire(Map<String,Object> modelArguments)
+ {
+ Map<String,Object> wireArguments = new HashMap<String, Object>();
+ for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
+ {
+ if(modelArguments.containsKey(entry.getValue()))
+ {
+ wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue()));
+ }
+ }
+
+ if(Boolean.TRUE.equals(modelArguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+ {
+ wireArguments.put(QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ }
+
+ return wireArguments;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
new file mode 100644
index 0000000000..5411a2bc9c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+
+public interface QueueFactory
+{
+ AMQQueue createAMQQueueImpl(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQSecurityException, AMQException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index e8c34128e9..bc1d5942bd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -32,11 +31,7 @@ public interface QueueRegistry
void registerQueue(AMQQueue queue);
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
+ void unregisterQueue(String name);
Collection<AMQQueue> getQueues();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b0ab93162a..e3dbd62b6c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,6 +51,7 @@ import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
@@ -68,12 +69,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
- public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
- public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
- private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
private static final String QPID_NO_GROUP = "qpid.no-group";
private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
+
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
@@ -237,7 +236,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
+ _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
_id = id;
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
@@ -255,19 +254,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
durable, !durable,
_entries.getPriorities() > 0));
- if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+ if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
{
- if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals(SHARED_MSG_GROUP_ARG_VALUE))
+ if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
+ && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
{
- Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP_ARG);
+ Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+ new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS);
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
else
@@ -358,13 +359,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_alternateExchange = exchange;
}
- /**
- * Arguments used to create this queue. The caller is assured
- * that null will never be returned.
- */
- public Map<String, Object> getArguments()
+
+ @Override
+ public Collection<String> getAvailableAttributes()
+ {
+ return new ArrayList<String>(_arguments.keySet());
+ }
+
+ @Override
+ public Object getAttribute(String attrName)
{
- return _arguments;
+ return _arguments.get(attrName);
}
public boolean isAutoDelete()
@@ -511,7 +516,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_logger.info("Auto-deleteing queue:" + this);
}
- delete();
+ getVirtualHost().removeQueue(this);
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
@@ -1340,7 +1345,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- _virtualHost.getQueueRegistry().unregisterQueue(_name);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -2282,18 +2286,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
if (description == null)
{
- _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION);
+ _arguments.remove(Queue.DESCRIPTION);
}
else
{
- _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description);
+ _arguments.put(Queue.DESCRIPTION, description);
}
}
@Override
public String getDescription()
{
- return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION);
+ return (String) _arguments.get(Queue.DESCRIPTION);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 931368cb97..960986ec45 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -167,12 +167,12 @@ public class SecurityManager implements ConfigurationChangeListener
{
String pluginTypeName = getPluginTypeName(accessControl);
_hostPlugins.put(pluginTypeName, accessControl);
-
+
if(_logger.isDebugEnabled())
{
_logger.debug("Added access control to host plugins with name: " + vhostName);
}
-
+
break;
}
}
@@ -366,7 +366,7 @@ public class SecurityManager implements ConfigurationChangeListener
}
public boolean authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive,
- final Boolean nowait, final Boolean passive, final AMQShortString queueName, final String owner)
+ final Boolean nowait, final Boolean passive, final String queueName, final String owner)
{
return checkAllPlugins(new AccessCheck()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
index 6c631fc360..893b371d11 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
@@ -212,7 +212,7 @@ public class ObjectProperties
}
public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive,
- AMQShortString queueName, String owner)
+ String queueName, String owner)
{
super();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index efb1e95e99..e9181c0e12 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -32,6 +33,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
public class DurableConfigurationStoreHelper
{
@@ -46,28 +48,23 @@ public class DurableConfigurationStoreHelper
attributesMap.put(Queue.NAME, queue.getName());
attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+
if (queue.getAlternateExchange() != null)
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- else
- {
- attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
- }
- if (attributesMap.containsKey(Queue.ARGUMENTS))
- {
- // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
- Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
- currentArgs.putAll(queue.getArguments());
- }
- else
+
+ Collection<String> availableAttrs = queue.getAvailableAttributes();
+
+ for(String attrName : availableAttrs)
{
- attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
+ attributesMap.put(attrName, queue.getAttribute(attrName));
}
+
store.update(queue.getId(), QUEUE, attributesMap);
}
- public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments)
+ public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
throws AMQStoreException
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
@@ -78,11 +75,9 @@ public class DurableConfigurationStoreHelper
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
- // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
- if (arguments != null)
+ for(String attrName : queue.getAvailableAttributes())
{
- attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
+ attributesMap.put(attrName, queue.getAttribute(attrName));
}
store.create(queue.getId(), QUEUE,attributesMap);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 4e27a008dd..d87431a415 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -58,11 +59,13 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
@@ -95,6 +98,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final ConnectionRegistry _connectionRegistry;
private final DtxRegistry _dtxRegistry;
+ private final AMQQueueFactory _queueFactory;
private volatile State _state = State.INITIALISING;
@@ -136,11 +140,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
+
_queueRegistry = new DefaultQueueRegistry(this);
+ _queueFactory = new AMQQueueFactory(this, _queueRegistry);
+
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
initialiseStatistics();
@@ -298,12 +305,12 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(queueConfiguration);
String queueName = queue.getName();
if (queue.isDurable())
{
- DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null);
+ DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
}
//get the exchange name (returns default exchange name if none was specified)
@@ -428,12 +435,102 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
@Override
+ public AMQQueue getQueue(String name)
+ {
+ return _queueRegistry.getQueue(name);
+ }
+
+ @Override
+ public AMQQueue getQueue(UUID id)
+ {
+ return _queueRegistry.getQueue(id);
+ }
+
+ @Override
+ public Collection<AMQQueue> getQueues()
+ {
+ return _queueRegistry.getQueues();
+ }
+
+ @Override
+ public int removeQueue(AMQQueue queue) throws AMQException
+ {
+ synchronized (getQueueRegistry())
+ {
+ int purged = queue.delete();
+
+ getQueueRegistry().unregisterQueue(queue.getName());
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ DurableConfigurationStore store = getDurableConfigurationStore();
+ DurableConfigurationStoreHelper.removeQueue(store, queue);
+ }
+ return purged;
+ }
+ }
+
+ @Override
+ public AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException
+ {
+ // Access check
+ if (!getSecurityManager().authoriseCreateQueue(autoDelete,
+ durable,
+ exclusive,
+ null,
+ null,
+ queueName,
+ owner))
+ {
+ String description = "Permission denied: queue-name '" + queueName + "'";
+ throw new AMQSecurityException(description);
+ }
+
+ synchronized (_queueRegistry)
+ {
+ if(_queueRegistry.getQueue(queueName) != null)
+ {
+ throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
+ }
+ if(id == null)
+ {
+
+ id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+ while(_queueRegistry.getQueue(id) != null)
+ {
+ id = UUID.randomUUID();
+ }
+
+ }
+ else if(_queueRegistry.getQueue(id) != null)
+ {
+ throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+ }
+ return _queueFactory.createAMQQueueImpl(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
+ arguments);
+ }
+
+ }
+
+ @Override
public Exchange getExchange(String name)
{
return _exchangeRegistry.getExchange(name);
}
@Override
+ public Exchange getExchange(UUID id)
+ {
+ return _exchangeRegistry.getExchange(id);
+ }
+
+ @Override
public Exchange getDefaultExchange()
{
return _exchangeRegistry.getDefaultExchange();
@@ -747,7 +844,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
{
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(this, getExchangeRegistry()),
+ new QueueRecoverer(this, getExchangeRegistry(), _queueFactory),
new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()),
new BindingRecoverer(this, getExchangeRegistry())
};
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
index 7cfadbcadf..2d3a620e91 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -91,7 +91,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
{
_unresolvedDependencies.add(new ExchangeDependency());
}
- _queue = _virtualHost.getQueueRegistry().getQueue(_queueId);
+ _queue = _virtualHost.getQueue(_queueId);
if(_queue == null)
{
_unresolvedDependencies.add(new QueueDependency());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 3526551073..8d05e719ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
@@ -60,6 +61,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider
currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader());
case 1:
currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
+ case 2:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
+
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
@@ -213,7 +217,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider
UUID queueId = UUID.fromString(queueIdString);
ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()))
- || _virtualHost.getQueueRegistry().getQueue(queueId) != null);
+ || _virtualHost.getQueue(queueId) != null);
}
private boolean isBinding(final String type)
@@ -224,4 +228,39 @@ public class DefaultUpgraderProvider implements UpgraderProvider
}
+ /*
+ * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
+ * attributes into the map using the model attribute names rather than the wire attribute names
+ */
+ private class Version2Upgrader extends NonNullUpgrader
+ {
+
+ private static final String ARGUMENTS = "arguments";
+
+ @Override
+ public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ {
+ if(Queue.class.getSimpleName().equals(type))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
+ if(attributes.get(ARGUMENTS) instanceof Map)
+ {
+ newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) attributes
+ .get(ARGUMENTS)));
+ }
+ newAttributes.putAll(attributes);
+ attributes = newAttributes;
+ getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+ }
+
+ getNextUpgrader().configuredObject(id,type,attributes);
+ }
+
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index 7929cd3e39..b4fbdf7544 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -20,18 +20,19 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
@@ -41,11 +42,15 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
private static final Logger _logger = Logger.getLogger(QueueRecoverer.class);
private final VirtualHost _virtualHost;
private final ExchangeRegistry _exchangeRegistry;
+ private final QueueFactory _queueFactory;
- public QueueRecoverer(final VirtualHost virtualHost, final ExchangeRegistry exchangeRegistry)
+ public QueueRecoverer(final VirtualHost virtualHost,
+ final ExchangeRegistry exchangeRegistry,
+ final QueueFactory queueFactory)
{
_virtualHost = virtualHost;
_exchangeRegistry = exchangeRegistry;
+ _queueFactory = queueFactory;
}
@Override
@@ -101,26 +106,24 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
String queueName = (String) _attributes.get(Queue.NAME);
String owner = (String) _attributes.get(Queue.OWNER);
boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
- @SuppressWarnings("unchecked")
- Map<String, Object> queueArgumentsMap = (Map<String, Object>) _attributes.get(Queue.ARGUMENTS);
+
+ Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes);
+ queueArgumentsMap.remove(Queue.NAME);
+ queueArgumentsMap.remove(Queue.OWNER);
+ queueArgumentsMap.remove(Queue.EXCLUSIVE);
+
try
{
- _queue = _virtualHost.getQueueRegistry().getQueue(_id);
+ _queue = _virtualHost.getQueue(_id);
if(_queue == null)
{
- _queue = _virtualHost.getQueueRegistry().getQueue(queueName);
+ _queue = _virtualHost.getQueue(queueName);
}
if (_queue == null)
{
- _queue = AMQQueueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, _virtualHost,
- queueArgumentsMap);
- _virtualHost.getQueueRegistry().registerQueue(_queue);
-
- if (_alternateExchange != null)
- {
- _queue.setAlternateExchange(_alternateExchange);
- }
+ _queue = _queueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive,
+ false, queueArgumentsMap);
}
}
catch (AMQException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index e06e785338..2ebbedccd4 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -21,15 +21,18 @@
package org.apache.qpid.server.virtualhost;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -45,7 +48,23 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
String getName();
- QueueRegistry getQueueRegistry();
+ AMQQueue getQueue(String name);
+
+ AMQQueue getQueue(UUID id);
+
+ Collection<AMQQueue> getQueues();
+
+ int removeQueue(AMQQueue queue) throws AMQException;
+
+ AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException;
+
Exchange createExchange(UUID id,
String exchange,
@@ -58,6 +77,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
void removeExchange(Exchange exchange, boolean force) throws AMQException;
Exchange getExchange(String name);
+ Exchange getExchange(UUID id);
+
Exchange getDefaultExchange();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 3738306f6a..39ca3197b4 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHandler implements
}
for(Transaction.Record record : enqueues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -179,7 +179,7 @@ public class VirtualHostConfigRecoveryHandler implements
}
for(Transaction.Record record : dequeues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -268,7 +268,7 @@ public class VirtualHostConfigRecoveryHandler implements
public void queueEntry(final UUID queueId, long messageId)
{
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
+ AMQQueue queue = _virtualHost.getQueue(queueId);
try
{
if(queue != null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java
new file mode 100644
index 0000000000..54f7d0d172
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class QueueExistsException extends AMQException
+{
+ private final AMQQueue _existing;
+
+ public QueueExistsException(String name, AMQQueue existing)
+ {
+ super(name);
+ _existing = existing;
+ }
+
+ public AMQQueue getExistingQueue()
+ {
+ return _existing;
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index de6d036f29..dc9ddf7b32 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(getName());
// Check that atest was a priority queue with 5 priorities
- AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
+ AMQQueue atest = vhost.getQueue("atest");
assertTrue(atest instanceof AMQPriorityQueue);
assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
// Check that ptest was a priority queue with 10 priorities
- AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
+ AMQQueue ptest = vhost.getQueue("ptest");
assertTrue(ptest instanceof AMQPriorityQueue);
assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
// Check that ntest wasn't a priority queue
- AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
+ AMQQueue ntest = vhost.getQueue("ntest");
assertFalse(ntest instanceof AMQPriorityQueue);
}
@@ -146,13 +146,13 @@ public class VirtualHostConfigurationTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(getName());
// Check specifically configured values
- AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
+ AMQQueue aTest = vhost.getQueue("atest");
assertEquals(4, aTest.getMaximumQueueDepth());
assertEquals(5, aTest.getMaximumMessageSize());
assertEquals(6, aTest.getMaximumMessageAge());
// Check default values
- AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
+ AMQQueue bTest = vhost.getQueue("btest");
assertEquals(1, bTest.getMaximumQueueDepth());
assertEquals(2, bTest.getMaximumMessageSize());
assertEquals(3, bTest.getMaximumMessageAge());
@@ -214,10 +214,10 @@ public class VirtualHostConfigurationTest extends QpidTestCase
assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled());
// Get queues
- AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles"));
- AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle"));
- AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2"));
- AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0"));
+ AMQQueue biggles = test.getQueue("biggles");
+ AMQQueue beetle = test.getQueue("beetle");
+ AMQQueue r2d2 = extra.getQueue("r2d2");
+ AMQQueue c3p0 = extra.getQueue("c3p0");
// Disabled specifically for this queue, overriding virtualhost setting
assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index f2a64381df..7adec3d595 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -75,7 +75,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testNoRoute() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -86,7 +87,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testDirectMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -108,7 +110,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testStarMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
@@ -139,7 +141,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
@@ -190,7 +192,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMidHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
routeMessage("a.c.d.b",0l);
@@ -215,7 +218,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMatchafterHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
@@ -253,7 +257,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashAfterHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -274,7 +279,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -295,7 +301,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testSubMatchFails() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null));
int queueCount = routeMessage("a.b.c",0l);
@@ -326,7 +333,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMoreRouting() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -339,7 +347,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMoreQueue() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 693fd16b9f..a468fa072b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collections;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import java.util.ArrayList;
+import org.apache.qpid.server.model.Queue;
import static org.mockito.Mockito.when;
@@ -38,9 +38,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
@Override
public void setUp() throws Exception
{
- FieldTable arguments = new FieldTable();
- arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3);
- setArguments(arguments);
+ setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3));
super.setUp();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index c8e0e53d75..62c9b4c46d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,91 +20,213 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.XMLConfiguration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class AMQQueueFactoryTest extends QpidTestCase
{
private QueueRegistry _queueRegistry;
private VirtualHost _virtualHost;
- private Broker _broker;
+ private AMQQueueFactory _queueFactory;
+ private List<AMQQueue> _queues;
+ private QueueConfiguration _queueConfiguration;
@Override
public void setUp() throws Exception
{
super.setUp();
- BrokerTestHelper.setUp();
- XMLConfiguration configXml = new XMLConfiguration();
- configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName());
+ _queues = new ArrayList<AMQQueue>();
+ _virtualHost = mock(VirtualHost.class);
- _broker = BrokerTestHelper.createBrokerMock();
- if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings"))
- {
- when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5);
- when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true);
- }
+ VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class);
+ when(_virtualHost.getConfiguration()).thenReturn(vhostConfig);
+ _queueConfiguration = mock(QueueConfiguration.class);
+ when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration);
+ LogActor logActor = mock(LogActor.class);
+ CurrentActor.set(logActor);
+ RootMessageLogger rootLogger = mock(RootMessageLogger.class);
+ when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
+ DurableConfigurationStore store = mock(DurableConfigurationStore.class);
+ when(_virtualHost.getDurableConfigurationStore()).thenReturn(store);
+
+ mockExchangeCreation();
+ mockQueueRegistry();
+ delegateVhostQueueCreation();
+
+ when(_virtualHost.getQueues()).thenReturn(_queues);
+
+
+ _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry);
- _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker));
- _queueRegistry = _virtualHost.getQueueRegistry();
}
+ private void delegateVhostQueueCreation() throws AMQException
+ {
+ final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class);
+
+ when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(),
+ autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then(
+ new Answer<AMQQueue>()
+ {
+ @Override
+ public AMQQueue answer(InvocationOnMock invocation) throws Throwable
+ {
+ return _queueFactory.createAMQQueueImpl(id.getValue(),
+ queueName.getValue(),
+ durable.getValue(),
+ owner.getValue(),
+ autoDelete.getValue(),
+ exclusive.getValue(),
+ deleteOnNoConsumer.getValue(),
+ arguments.getValue());
+ }
+ }
+ );
+ }
+
+ private void mockQueueRegistry()
+ {
+ _queueRegistry = mock(QueueRegistry.class);
+
+ final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class);
+ doAnswer(new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ AMQQueue queue = capturedQueue.getValue();
+ when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue);
+ when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue);
+ when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue);
+ when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue);
+ _queues.add(queue);
+
+ return null;
+ }
+ }).when(_queueRegistry).registerQueue(capturedQueue.capture());
+ }
+
+ private void mockExchangeCreation() throws AMQException
+ {
+ final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class);
+
+ when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(),
+ anyBoolean(), anyBoolean(), anyString())).then(
+ new Answer<Exchange>()
+ {
+ @Override
+ public Exchange answer(InvocationOnMock invocation) throws Throwable
+ {
+ final String name = exchangeNameCapture.getValue();
+ final UUID id = idCapture.getValue();
+
+ final Exchange exchange = mock(Exchange.class);
+ ExchangeType exType = mock(ExchangeType.class);
+
+ when(exchange.getName()).thenReturn(name);
+ when(exchange.getId()).thenReturn(id);
+ when(exchange.getType()).thenReturn(exType);
+ final String typeName = type.getValue();
+ when(exType.getType()).thenReturn(typeName);
+ when(exType.getName()).thenReturn(new AMQShortString(typeName));
+
+ when(_virtualHost.getExchange(eq(name))).thenReturn(exchange);
+ when(_virtualHost.getExchange(eq(id))).thenReturn(exchange);
+
+ final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class);
+
+ when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() {
+
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ when(exchange.isBound(eq(queue.getValue()))).thenReturn(true);
+ return true;
+ }
+ });
+
+ return exchange;
+ }
+ }
+ );
+ }
+
@Override
public void tearDown() throws Exception
{
- try
- {
- _virtualHost.close();
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
+ super.tearDown();
}
private void verifyRegisteredQueueCount(int count)
{
- assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size());
+ assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size());
}
private void verifyQueueRegistered(String queueName)
{
- assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName));
+ assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName));
}
public void testPriorityQueueRegistration() throws Exception
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5);
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false,
- false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false,
+ false,
+ false,
+ attributes);
assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
@@ -115,43 +237,42 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testSimpleQueueRegistration() throws Exception
{
String queueName = getName();
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
- false, _virtualHost, null);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
+ false,
+ false,
+ null);
assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
verifyQueueRegistered(queueName);
//verify that no alternate exchange or DLQ were produced
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange());
- assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName));
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
* cause the alternate exchange to be set and DLQ to be produced.
* @throws AMQException
*/
public void testDeadLetterQueueEnabled() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ attributes);
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
@@ -161,7 +282,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
- AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
@@ -178,17 +299,20 @@ public class AMQQueueFactoryTest extends QpidTestCase
*/
public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception
{
+
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ when(_queueConfiguration.getMaxDeliveryCount()).thenReturn(5);
+ when(_queueConfiguration.isDeadLetterQueueEnabled()).thenReturn(true);
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, null);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ null);
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
Exchange altExchange = queue.getAlternateExchange();
@@ -199,7 +323,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
- AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
@@ -210,81 +334,77 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
* result in the alternate exchange being set and DLQ being created.
* @throws AMQException
*/
public void testDeadLetterQueueDisabled() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false);
String queueName = "testDeadLetterQueueDisabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ attributes);
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
- assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
* creating an auto-delete queue, does not result in the alternate exchange
* being set and DLQ being created.
* @throws AMQException
*/
public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
//create an autodelete queue
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
+ false,
+ attributes);
assertTrue("Queue should be autodelete", queue.isAutoDelete());
//ensure that the autodelete property overrides the request to enable DLQ
assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName));
- assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
* the desired effect.
*/
public void testMaximumDeliveryCount() throws Exception
{
- final FieldTable fieldTable = new FieldTable();
- fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
+ false,
+ attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
@@ -293,13 +413,14 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
* that queue is created with a default maximumDeliveryCount of zero (unless set in config).
*/
public void testMaximumDeliveryCountDefault() throws Exception
{
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
- _virtualHost, null);
+ final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
+ false,
+ null);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
@@ -314,7 +435,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
{
try
{
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, _virtualHost, null);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false,
+ false,
+ null);
fail("queue with null name can not be created!");
}
catch (Exception e)
@@ -336,10 +459,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE");
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
+ false, false, false, attributes);
fail("queue with DLQ name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -362,10 +484,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ");
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
+ false, false, false, attributes);
fail("queue with DLE name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -379,16 +500,17 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testMessageGroupFromConfig() throws Exception
{
- PropertiesConfiguration queueConfig = new PropertiesConfiguration();
- queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey");
- queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1");
+ Map<String,String> arguments = new HashMap<String, String>();
+ arguments.put("qpid.group_header_key","mykey");
+ arguments.put("qpid.shared_msg_group","1");
+ QueueConfiguration qConf = mock(QueueConfiguration.class);
+ when(qConf.getArguments()).thenReturn(arguments);
+ when(qConf.getName()).thenReturn("test");
- final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);;
- QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig);
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost);
- assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY));
- assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(qConf);
+ assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY));
+ assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
}
private String generateStringWithLength(char ch, int length)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index b677ece408..e490db288c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -469,7 +469,14 @@ public class MockAMQQueue implements AMQQueue
}
- public Map<String, Object> getArguments()
+ @Override
+ public Collection<String> getAvailableAttributes()
+ {
+ return null;
+ }
+
+ @Override
+ public Object getAttribute(String attrName)
{
return null;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 0faa796f1c..2328745b83 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -65,7 +65,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
private AMQShortString _routingKey = new AMQShortString("routing key");
private DirectExchange _exchange;
private MockSubscription _subscription = new MockSubscription();
- private FieldTable _arguments = null;
+ private Map<String,Object> _arguments = null;
private MessagePublishInfo info = new MessagePublishInfo()
{
@@ -103,8 +103,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
- false, false, _virtualHost, FieldTable.convertToMap(_arguments));
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
+ false, false, false, _arguments);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString());
}
@@ -127,8 +127,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testCreateQueue() throws AMQException
{
_queue.stop();
- try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments));
+ try
+ {
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
+ false, _owner.asString(), false,
+ false, false, _arguments);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -137,7 +140,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
e.getMessage().contains("name"));
}
- try {
+ try
+ {
_queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
assertNull("Queue was created", _queue);
}
@@ -147,8 +151,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false,
- false, _virtualHost, FieldTable.convertToMap(_arguments));
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
+ "differentName", false,
+ _owner.asString(), false,
+ false, false, _arguments);
assertNotNull("Queue was not created", _queue);
}
@@ -1225,12 +1231,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
return _subscription;
}
- public FieldTable getArguments()
+ public Map<String,Object> getArguments()
{
return _arguments;
}
- public void setArguments(FieldTable arguments)
+ public void setArguments(Map<String,Object> arguments)
{
_arguments = arguments;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 4abb7233dc..c115af5a38 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -50,8 +50,8 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase
try
{
- SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "test", false,
- "owner", false, false, test, null);
+ SimpleAMQQueue queue = (SimpleAMQQueue)
+ test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null);
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 67cf0780da..e9ad4ba236 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -52,6 +52,9 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec
import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
@@ -178,7 +181,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testBindQueue() throws Exception
{
- AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -197,7 +200,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUnbindQueue() throws Exception
{
- AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -212,8 +215,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueue() throws Exception
{
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, null);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
Map<String, Object> queueAttributes = new HashMap<String, Object>();
@@ -225,13 +228,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueueFieldTable() throws Exception
{
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
@@ -241,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
@@ -250,8 +252,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
Exchange alternateExchange = createTestAlternateExchange();
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
@@ -275,16 +277,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUpdateQueueExclusivity() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
- queue = createTestQueue(getName(), getName() + "Owner", false);
- when(queue.getArguments()).thenReturn(attributes);
+ queue = createTestQueue(getName(), getName() + "Owner", false, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
@@ -295,7 +296,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -304,17 +305,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUpdateQueueAlternateExchange() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
Exchange alternateExchange = createTestAlternateExchange();
- queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange);
- when(queue.getArguments()).thenReturn(attributes);
+ queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
@@ -325,7 +324,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -334,12 +333,11 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testRemoveQueue() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
@@ -349,12 +347,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
anyMap());
}
- private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException
+ private AMQQueue createTestQueue(String queueName,
+ String queueOwner,
+ boolean exclusive,
+ final Map<String, Object> arguments) throws AMQStoreException
{
- return createTestQueue(queueName, queueOwner, exclusive, null);
+ return createTestQueue(queueName, queueOwner, exclusive, null, arguments);
}
- private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException
+ private AMQQueue createTestQueue(String queueName,
+ String queueOwner,
+ boolean exclusive,
+ Exchange alternateExchange,
+ final Map<String, Object> arguments) throws AMQStoreException
{
AMQQueue queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
@@ -363,6 +368,23 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
+ if(arguments != null && !arguments.isEmpty())
+ {
+ when(queue.getAvailableAttributes()).thenReturn(arguments.keySet());
+ final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
+ when(queue.getAttribute(requestedAttribute.capture())).then(
+ new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ String attrName = requestedAttribute.getValue();
+ return arguments.get(attrName);
+ }
+ });
+ }
+
return queue;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index caf6acb4bb..cb1fc2737d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -181,9 +181,8 @@ public class BrokerTestHelper
public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException
{
- SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null,
- false, false, virtualHost, Collections.<String, Object>emptyMap());
- virtualHost.getQueueRegistry().registerQueue(queue);
+ SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
+ false, false, false, Collections.<String, Object>emptyMap());
return queue;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 2d3483f078..66a71c562f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -45,6 +45,8 @@ import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.*;
import org.apache.qpid.server.security.SecurityManager;
@@ -82,7 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private DurableConfigurationStore _store;
private ExchangeFactory _exchangeFactory;
private ExchangeRegistry _exchangeRegistry;
- private QueueRegistry _queueRegistry;
+ private QueueFactory _queueFactory;
@Override
public void setUp() throws Exception
@@ -105,6 +107,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
+ when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue);
final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class);
doAnswer(new Answer()
@@ -114,37 +117,68 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
public Object answer(final InvocationOnMock invocation) throws Throwable
{
Exchange exchange = registeredExchange.getValue();
- when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange);
- when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange);
+ when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange);
+ when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange);
return null;
}
}).when(_exchangeRegistry).registerExchange(registeredExchange.capture());
- _queueRegistry = mock(QueueRegistry.class);
- when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry);
+ final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class);
- when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+ _queueFactory = mock(QueueFactory.class);
- final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class);
- doAnswer(new Answer()
- {
+ when(_queueFactory.createAMQQueueImpl(idArg.capture(), queueArg.capture(),
+ anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then(
+ new Answer()
+ {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable
- {
- AMQQueue queue = registeredQueue.getValue();
- when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue);
- when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue);
- return null;
- }
- }).when(_queueRegistry).registerQueue(registeredQueue.capture());
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ final AMQQueue queue = mock(AMQQueue.class);
+
+ final String queueName = queueArg.getValue();
+ final UUID queueId = idArg.getValue();
+
+ when(queue.getName()).thenReturn(queueName);
+ when(queue.getId()).thenReturn(queueId);
+ when(_vhost.getQueue(eq(queueName))).thenReturn(queue);
+ when(_vhost.getQueue(eq(queueId))).thenReturn(queue);
+
+ final ArgumentCaptor<Exchange> altExchangeArg = ArgumentCaptor.forClass(Exchange.class);
+ doAnswer(
+ new Answer()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Exchange value = altExchangeArg.getValue();
+ when(queue.getAlternateExchange()).thenReturn(value);
+ return null;
+ }
+ }
+ ).when(queue).setAlternateExchange(altExchangeArg.capture());
+
+ Map args = argsArg.getValue();
+ if(args.containsKey(Queue.ALTERNATE_EXCHANGE))
+ {
+ final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
+ final Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
+ queue.setAlternateExchange(exchange);
+ }
+ return queue;
+ }
+ });
_exchangeFactory = mock(ExchangeFactory.class);
+
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(_vhost, _exchangeRegistry),
+ new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory),
new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory),
new BindingRecoverer(_vhost, _exchangeRegistry)
};
@@ -356,24 +390,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final UUID queueId = new UUID(1, 0);
final UUID exchangeId = new UUID(2, 0);
- /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than
- queue creation being on vhost - yuck! */
- SecurityManager securityManager = mock(SecurityManager.class);
- when(_vhost.getSecurityManager()).thenReturn(securityManager);
- when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),
- any(AMQShortString.class),anyString())).thenReturn(true);
- VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class);
- when(_vhost.getConfiguration()).thenReturn(configuration);
- QueueConfiguration queueConfiguration = mock(QueueConfiguration.class);
- when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration);
- LogActor logActor = mock(LogActor.class);
- CurrentActor.set(logActor);
- RootMessageLogger rootLogger = mock(RootMessageLogger.class);
- when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
- /* end of queue creation mock hackery */
-
final Exchange customExchange = mock(Exchange.class);
+ when(customExchange.getId()).thenReturn(exchangeId);
+ when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME);
+
when(_exchangeFactory.createExchange(eq(exchangeId),
eq(CUSTOM_EXCHANGE_NAME),
eq(HeadersExchange.TYPE.getType()),
@@ -390,7 +411,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.completeConfigurationRecovery();
- assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange);
+ assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange());
}
private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 6769c1c2fc..1ca7ff1b65 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhost;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -28,6 +29,7 @@ import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
@@ -119,6 +121,43 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public AMQQueue getQueue(String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue getQueue(UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<AMQQueue> getQueues()
+ {
+ return null;
+ }
+
+ @Override
+ public int removeQueue(AMQQueue queue) throws AMQException
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException
+ {
+ return null;
+ }
+
+ @Override
public Exchange createExchange(UUID id,
String exchange,
String type,
@@ -141,6 +180,12 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public Exchange getExchange(UUID id)
+ {
+ return null;
+ }
+
+ @Override
public Exchange getDefaultExchange()
{
return null;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index e72196c383..03cb483e40 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -244,7 +244,7 @@ public class StandardVirtualHostTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(vhostName, config);
assertNotNull("virtualhost should exist", vhost);
- AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName);
+ AMQQueue queue = vhost.getQueue(queueName);
assertNotNull("queue should exist", queue);
Exchange defaultExch = vhost.getDefaultExchange();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
index 602bdb66b5..c9f3aca91f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
@@ -97,7 +97,7 @@ public class DurableQueueLoggingTest extends AbstractTestLogging
String clientID = _connection.getClientID();
assertNotNull("clientID should not be null", clientID);
-
+
validateQueueProperties(results, false, false, clientID);
}
@@ -256,7 +256,7 @@ public class DurableQueueLoggingTest extends AbstractTestLogging
validateQueueProperties(results, true, true, null);
}
-
+
private List<String> waitForMesssage() throws IOException
{
// Validation
@@ -267,14 +267,14 @@ public class DurableQueueLoggingTest extends AbstractTestLogging
// Only 1 Queue message should hav been logged
assertEquals("Result set size not as expected", 1, results.size());
-
+
return results;
}
public void validateQueueProperties(List<String> results, boolean hasPriority, boolean hasAutodelete, String clientID)
{
String log = getLogMessage(results, 0);
-
+
// Message Should be a QUE-1001
validateMessageID("QUE-1001", log);
@@ -290,7 +290,7 @@ public class DurableQueueLoggingTest extends AbstractTestLogging
fromMessage(log).contains("Priority: " + PRIORITIES));
// Queue is AutoDelete
- assertEquals("Unexpected AutoDelete status:" + fromMessage(log), hasAutodelete,
+ assertEquals("Unexpected AutoDelete status:" + fromMessage(log), hasAutodelete,
fromMessage(log).contains("AutoDelete"));
if(clientID != null)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
index 14dee60124..844e3ecc11 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
@@ -336,7 +336,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
private Queue createQueue() throws AMQException, JMSException
{
final Map<String, Object> arguments = new HashMap<String, Object>();
- arguments.put(AMQQueueFactory.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY);
+ arguments.put(QueueArgumentsConverter.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY);
((AMQSession<?,?>) _producerSession).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments);
final Queue queue = new AMQQueue("amq.direct", getTestQueueName());
((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination) queue);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index a3551b8952..a57eca23bd 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.model.Broker;
@@ -49,7 +51,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.ConflationQueue;
import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
-import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -83,17 +85,17 @@ public class MessageStoreTest extends QpidTestCase
private String directExchangeName = "MST-DirectExchange";
private String topicExchangeName = "MST-TopicExchange";
- private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
- private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
- private AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
- private AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
+ private String durablePriorityTopicQueueName = "MST-PriorityTopicQueue-Durable";
+ private String durableTopicQueueName = "MST-TopicQueue-Durable";
+ private String priorityTopicQueueName = "MST-PriorityTopicQueue";
+ private String topicQueueName = "MST-TopicQueue";
- private AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
- private AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
- private AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable");
- private AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
- private AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
- private AMQShortString queueName = new AMQShortString("MST-Queue");
+ private String durableExclusiveQueueName = "MST-Queue-Durable-Exclusive";
+ private String durablePriorityQueueName = "MST-PriorityQueue-Durable";
+ private String durableLastValueQueueName = "MST-LastValueQueue-Durable";
+ private String durableQueueName = "MST-Queue-Durable";
+ private String priorityQueueName = "MST-PriorityQueue";
+ private String queueName = "MST-Queue";
private AMQShortString directRouting = new AMQShortString("MST-direct");
private AMQShortString topicRouting = new AMQShortString("MST-topic");
@@ -202,7 +204,7 @@ public class MessageStoreTest extends QpidTestCase
*/
public void testQueueExchangeAndBindingCreation() throws Exception
{
- assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueueRegistry().getQueues().size());
+ assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueues().size());
createAllQueues();
createAllTopicQueues();
@@ -240,7 +242,7 @@ public class MessageStoreTest extends QpidTestCase
validateMessageOnTopics(2, true);
assertEquals("Not all queues correctly registered",
- 10, getVirtualHost().getQueueRegistry().getQueues().size());
+ 10, getVirtualHost().getQueues().size());
}
/**
@@ -269,13 +271,11 @@ public class MessageStoreTest extends QpidTestCase
{
testMessagePersistence();
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
assertEquals("Incorrect number of queues registered after recovery",
- 6, queueRegistry.getQueues().size());
+ 6, getVirtualHost().getQueues().size());
//clear the queue
- queueRegistry.getQueue(durableQueueName).clearQueue();
+ _virtualHost.getQueue(durableQueueName).clearQueue();
//check the messages are gone
validateMessageOnQueue(durableQueueName, 0);
@@ -294,7 +294,7 @@ public class MessageStoreTest extends QpidTestCase
public void testQueuePersistence() throws Exception
{
assertEquals("Should not be any existing queues",
- 0, getVirtualHost().getQueueRegistry().getQueues().size());
+ 0, getVirtualHost().getQueues().size());
//create durable and non durable queues/topics
createAllQueues();
@@ -303,20 +303,18 @@ public class MessageStoreTest extends QpidTestCase
//reload the virtual host, prompting recovery of the queues/topics
reloadVirtualHost();
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
assertEquals("Incorrect number of queues registered after recovery",
- 6, queueRegistry.getQueues().size());
+ 6, getVirtualHost().getQueues().size());
//Validate the non-Durable Queues were not recovered.
assertNull("Non-Durable queue still registered:" + priorityQueueName,
- queueRegistry.getQueue(priorityQueueName));
+ getVirtualHost().getQueue(priorityQueueName));
assertNull("Non-Durable queue still registered:" + queueName,
- queueRegistry.getQueue(queueName));
+ getVirtualHost().getQueue(queueName));
assertNull("Non-Durable queue still registered:" + priorityTopicQueueName,
- queueRegistry.getQueue(priorityTopicQueueName));
+ getVirtualHost().getQueue(priorityTopicQueueName));
assertNull("Non-Durable queue still registered:" + topicQueueName,
- queueRegistry.getQueue(topicQueueName));
+ getVirtualHost().getQueue(topicQueueName));
//Validate normally expected properties of Queues/Topics
validateDurableQueueProperties();
@@ -336,27 +334,24 @@ public class MessageStoreTest extends QpidTestCase
//Register Durable Queue
createQueue(durableQueueName, false, true, false, false);
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered before recovery",
- 1, queueRegistry.getQueues().size());
+ 1, getVirtualHost().getQueues().size());
reloadVirtualHost();
- queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after first recovery",
- 1, queueRegistry.getQueues().size());
+ 1, getVirtualHost().getQueues().size());
//test that removing the queue means it is not recovered next time
- final AMQQueue queue = queueRegistry.getQueue(durableQueueName);
+ final AMQQueue queue = getVirtualHost().getQueue(durableQueueName);
DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue);
reloadVirtualHost();
- queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of queues registered after second recovery",
- 0, queueRegistry.getQueues().size());
+ 0, getVirtualHost().getQueues().size());
assertNull("Durable queue was not removed:" + durableQueueName,
- queueRegistry.getQueue(durableQueueName));
+ getVirtualHost().getQueue(durableQueueName));
}
/**
@@ -450,34 +445,30 @@ public class MessageStoreTest extends QpidTestCase
*/
public void testDurableBindingRemoval() throws Exception
{
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
//create durable queue and exchange, bind them
Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
createQueue(durableQueueName, false, true, false, false);
- bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
+ bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false, null);
assertEquals("Incorrect number of bindings registered before recovery",
- 1, queueRegistry.getQueue(durableQueueName).getBindings().size());
+ 1, getVirtualHost().getQueue(durableQueueName).getBindings().size());
//verify binding is actually normally recovered
reloadVirtualHost();
- queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of bindings registered after first recovery",
- 1, queueRegistry.getQueue(durableQueueName).getBindings().size());
+ 1, getVirtualHost().getQueue(durableQueueName).getBindings().size());
exch = getVirtualHost().getExchange(directExchangeName);
assertNotNull("Exchange was not recovered", exch);
//remove the binding and verify result after recovery
- unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
+ unbindQueueFromExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false, null);
reloadVirtualHost();
- queueRegistry = getVirtualHost().getQueueRegistry();
assertEquals("Incorrect number of bindings registered after second recovery",
- 0, queueRegistry.getQueue(durableQueueName).getBindings().size());
+ 0, getVirtualHost().getQueue(durableQueueName).getBindings().size());
}
/**
@@ -514,15 +505,14 @@ public class MessageStoreTest extends QpidTestCase
/** Validates the Durable queues and their properties are as expected following recovery */
private void validateBindingProperties()
{
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size());
+ assertEquals("Incorrect number of (durable) queues following recovery", 6, getVirtualHost().getQueues().size());
- validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false);
- validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true);
- validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false);
- validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true);
- validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false);
+ validateBindingProperties(getVirtualHost().getQueue(durablePriorityQueueName).getBindings(), false);
+ validateBindingProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName).getBindings(), true);
+ validateBindingProperties(getVirtualHost().getQueue(durableQueueName).getBindings(), false);
+ validateBindingProperties(getVirtualHost().getQueue(durableTopicQueueName).getBindings(), true);
+ validateBindingProperties(getVirtualHost().getQueue(durableExclusiveQueueName).getBindings(), false);
}
/**
@@ -550,18 +540,14 @@ public class MessageStoreTest extends QpidTestCase
private void setQueueExclusivity(boolean exclusive) throws AMQException
{
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
+ AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
queue.setExclusive(exclusive);
}
private void validateQueueExclusivityProperty(boolean expected)
{
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
+ AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
}
@@ -569,14 +555,12 @@ public class MessageStoreTest extends QpidTestCase
private void validateDurableQueueProperties()
{
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false);
- validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true);
+ validateQueueProperties(getVirtualHost().getQueue(durablePriorityQueueName), true, true, false, false);
+ validateQueueProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName), true, true, false, false);
+ validateQueueProperties(getVirtualHost().getQueue(durableQueueName), false, true, false, false);
+ validateQueueProperties(getVirtualHost().getQueue(durableTopicQueueName), false, true, false, false);
+ validateQueueProperties(getVirtualHost().getQueue(durableExclusiveQueueName), false, true, true, false);
+ validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true);
}
private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
@@ -724,10 +708,10 @@ public class MessageStoreTest extends QpidTestCase
createQueue(topicQueueName, false, false, false, false);
}
- private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+ private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
- FieldTable queueArguments = null;
+ Map<String,Object> queueArguments = null;
if(usePriority || lastValueQueue)
{
@@ -736,14 +720,12 @@ public class MessageStoreTest extends QpidTestCase
if (usePriority)
{
- queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
+ queueArguments = Collections.singletonMap(Queue.PRIORITIES, (Object) DEFAULT_PRIORTY_LEVEL);
}
if (lastValueQueue)
{
- queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY);
+ queueArguments = Collections.singletonMap(Queue.LVQ_KEY, (Object) LVQ_KEY);
}
AMQQueue queue = null;
@@ -751,25 +733,17 @@ public class MessageStoreTest extends QpidTestCase
//Ideally we would be able to use the QueueDeclareHandler here.
try
{
- queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName.asString(), durable, queueOwner.asString(), false, exclusive,
- getVirtualHost(), FieldTable.convertToMap(queueArguments));
+ queue = getVirtualHost().createQueue(UUIDGenerator.generateRandomUUID(), queueName, durable, queueOwner.asString(), false, exclusive,
+ false, queueArguments);
validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(),
- queue,
- queueArguments);
- }
}
catch (AMQException e)
{
fail(e.getMessage());
}
- getVirtualHost().getQueueRegistry().registerQueue(queue);
-
}
private Map<String, Exchange> createExchanges()
@@ -805,28 +779,24 @@ public class MessageStoreTest extends QpidTestCase
private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
+ queueArguments.put(new AMQShortString(QueueArgumentsConverter.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false, queueArguments);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false, null);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityQueueName), false, queueArguments);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(queueName), false, null);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false, null);
}
private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
-
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
+ queueArguments.put(new AMQShortString(QueueArgumentsConverter.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true, queueArguments);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableTopicQueueName), true, null);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityTopicQueueName), true, queueArguments);
+ bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(topicQueueName), true, null);
}
@@ -894,9 +864,9 @@ public class MessageStoreTest extends QpidTestCase
}
}
- private void validateMessageOnQueue(AMQShortString queueName, long messageCount)
+ private void validateMessageOnQueue(String queueName, long messageCount)
{
- AMQQueue queue = getVirtualHost().getQueueRegistry().getQueue(queueName);
+ AMQQueue queue = getVirtualHost().getQueue(queueName);
assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index cf066e3c01..2d6943f643 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.NotificationCheckTest;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.queue.SimpleAMQQueueTest;
import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
@@ -168,7 +169,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
public void testNewQueueWithDescription() throws Exception
{
String queueName = getTestQueueName();
- Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION);
+ Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION);
((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments);
final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
@@ -181,7 +182,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
public void testQueueDescriptionSurvivesRestart() throws Exception
{
String queueName = getTestQueueName();
- Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION);
+ Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION);
((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments);
@@ -195,7 +196,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
}
/**
- * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests
+ * Tests queue creation with {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests
* that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}.
*/
public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception
@@ -204,7 +205,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
final Integer deliveryCount = 1;
- final Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount);
+ final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount);
managedBroker.createNewQueue(queueName, null, true, arguments);
// Ensure the queue exists
@@ -225,10 +226,10 @@ public class QueueManagementTest extends QpidBrokerTestCase
final Long maximumQueueDepth = 300l;
final Long maximumMessageAge = 400l;
final Map<String, Object> arguments = new HashMap<String, Object>();
- arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount);
- arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize);
- arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth);
- arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge);
+ arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount);
+ arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize);
+ arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth);
+ arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge);
managedBroker.createNewQueue(queueName, null, true, arguments);
@@ -642,7 +643,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
final Object messageGroupKey = "test";
- final Map<String, Object> arguments = Collections.singletonMap(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey);
+ final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey);
managedBroker.createNewQueue(queueName, null, true, arguments);
final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
@@ -659,8 +660,8 @@ public class QueueManagementTest extends QpidBrokerTestCase
final Object messageGroupKey = "test";
final Map<String, Object> arguments = new HashMap<String, Object>(2);
- arguments.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey);
- arguments.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey);
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
managedBroker.createNewQueue(queueName, null, true, arguments);
final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 940d6a3298..969f222316 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
@@ -291,7 +292,7 @@ public class VirtualHostRestTest extends QpidRestTestCase
Asserts.assertQueue(queueName , "lvq", lvqQueue);
assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE));
- assertEquals("Unexpected lvq key attribute", AMQQueueFactory.QPID_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY));
+ assertEquals("Unexpected lvq key attribute", AMQQueueFactory.QPID_DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY));
}
public void testPutCreateSortedQueueWithoutKey() throws Exception
@@ -460,7 +461,7 @@ public class VirtualHostRestTest extends QpidRestTestCase
String queueName = getTestQueueName();
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
//verify the starting state
Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 67a2988ad1..bd826259bc 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -200,8 +200,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"{" +
"exclusive: true," +
"arguments: {" +
- "'qpid.max_size': 1000," +
- "'qpid.max_count': 100" +
+ "'qpid.alert_size': 1000," +
+ "'qpid.alert_count': 100" +
"}" +
"}, " +
"x-bindings: [{exchange : 'amq.direct', key : test}, " +
@@ -401,7 +401,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"x-declare: " +
"{ " +
"auto-delete: true," +
- "arguments: {'qpid.max_count': 100}" +
+ "arguments: {'qpid.alert_count': 100}" +
"}, " +
"x-bindings: [{exchange : 'amq.direct', key : test}, " +
"{exchange : 'amq.topic', key : 'a.#'}," +
@@ -485,7 +485,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
Hashtable<String,String> map = new Hashtable<String,String>();
map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " +
- "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}");
+ "{x-declare: {auto-delete: true, arguments : {'qpid.alert_size': 1000}}}}");
map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }");
@@ -603,7 +603,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
String addr = "ADDR:amq.direct/x512; {" +
"link : {name : 'MY.RESP.QUEUE', " +
"x-declare : { auto-delete: true, exclusive: true, " +
- "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
+ "arguments : {'qpid.alert_size': 1000, 'qpid.policy_type': ring} } } }";
queue = ssn.createQueue(addr);
cons = ssn.createConsumer(queue);
@@ -1403,7 +1403,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
String xDeclareArgs = "x-declare: { exclusive: false, auto-delete: false," +
"alternate-exchange: 'amq.fanout'," +
- "arguments: {'qpid.max_size': 1000,'qpid.max_count': 100}" +
+ "arguments: {'qpid.alert_size': 1000,'qpid.alert_count': 100}" +
"}";
String addr = "ADDR:amq.topic/test; {link: {name:my-queue, durable:true," + xDeclareArgs + "}}";