diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
| commit | c4794154e1c058d6761ef59f525deb68b42ef91f (patch) | |
| tree | be8fdcd5918be68b36cc983b40b8ad10d93244dd /java | |
| parent | 9d4a19f07a6d54fbc0dbc59e750992f6abe45a40 (diff) | |
| download | qpid-python-c4794154e1c058d6761ef59f525deb68b42ef91f.tar.gz | |
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
60 files changed, 1419 insertions, 883 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 2350e28ee2..a4383d94c4 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -1403,6 +1403,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { if (_stateManager.isInState(State.ACTIVE)) { + LOGGER.debug("Storing configured object: " + configuredObject); DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); keyBinding.objectToEntry(configuredObject.getId(), key); @@ -1430,6 +1431,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws AMQStoreException { + + LOGGER.debug("Removing configured object: " + id); DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); uuidBinding.objectToEntry(id, key); diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 464608ff59..5d84dc2c91 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; import org.apache.qpid.server.util.MapJsonSerializer; @@ -91,6 +92,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade ExchangeDefaults.HEADERS_EXCHANGE_NAME.asString(), ExchangeDefaults.TOPIC_EXCHANGE_NAME.asString(), ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString() }; private static final Set<String> DEFAULT_EXCHANGES_SET = new HashSet<String>(Arrays.asList(DEFAULT_EXCHANGES)); + private static final String ARGUMENTS = "arguments"; private MapJsonSerializer _serializer = new MapJsonSerializer(); @@ -580,10 +582,10 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade if (moveNonExclusiveOwnerToDescription(owner, exclusive)) { - _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + _logger.info("Non-exclusive owner " + owner + " for queue " + queueName + " moved to " + QueueArgumentsConverter.X_QPID_DESCRIPTION); attributesMap.put(Queue.OWNER, null); - argumentsCopy.put(AMQShortString.valueOf(AMQQueueFactory.X_QPID_DESCRIPTION), owner); + argumentsCopy.put(AMQShortString.valueOf(QueueArgumentsConverter.X_QPID_DESCRIPTION), owner); } else { @@ -591,7 +593,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade } if (!argumentsCopy.isEmpty()) { - attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(argumentsCopy)); + attributesMap.put(ARGUMENTS, FieldTable.convertToMap(argumentsCopy)); } return attributesMap; } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java index 390d667db0..63af8d3840 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -48,6 +48,7 @@ import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.url.URLSyntaxException; /** @@ -159,7 +160,7 @@ public class BDBStoreUpgradeTestPreparer session = connection.createSession(true, Session.SESSION_TRANSACTED); // Create a priority queue on broker final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>(); - priorityQueueArguments.put("x-qpid-priorities",10); + priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10); createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments); // Create a queue that has a DLQ @@ -342,4 +343,4 @@ public class BDBStoreUpgradeTestPreparer BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer(); producer.prepareBroker(); } -}
\ No newline at end of file +} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index c33d427868..44f0861275 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.berkeleydb.entry.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; @@ -76,6 +77,7 @@ import com.sleepycat.je.Transaction; public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6Test.class); + private static final String ARGUMENTS = "arguments"; @Override protected String getStoreDirectoryName() @@ -287,12 +289,12 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null)); final Map<String, Object> queueWithOwnerArguments = new HashMap<String, Object>(); - queueWithOwnerArguments.put("x-qpid-priorities", 10); - queueWithOwnerArguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description"); + queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10); + queueWithOwnerArguments.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, "misused-owner-as-description"); expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,queueWithOwnerArguments)); final Map<String, Object> priorityQueueArguments = new HashMap<String, Object>(); - priorityQueueArguments.put("x-qpid-priorities", 10); + priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES, 10); expected.add(createExpectedQueueMap(PRIORITY_QUEUE_NAME, Boolean.FALSE, null, priorityQueueArguments)); final Map<String, Object> queueWithDLQArguments = new HashMap<String, Object>(); @@ -388,7 +390,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase expectedQueueEntry.put(Queue.OWNER, owner); if (argumentMap != null) { - expectedQueueEntry.put(Queue.ARGUMENTS, argumentMap); + expectedQueueEntry.put(ARGUMENTS, argumentMap); } return expectedQueueEntry; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 8e79813216..60211823f8 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,24 +20,26 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.LinkedHashMap; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.AMQUnknownExchangeType; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; @@ -61,6 +63,7 @@ import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; import org.apache.qpid.transport.*; import java.nio.ByteBuffer; @@ -72,11 +75,6 @@ public class ServerSessionDelegate extends SessionDelegate { private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); - /** - * No-local queue argument is used to support the no-local feature of Durable Subscribers. - */ - private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local"; - public ServerSessionDelegate() { @@ -195,10 +193,9 @@ public class ServerSessionDelegate extends SessionDelegate else { String queueName = method.getQueue(); - QueueRegistry queueRegistry = getQueueRegistry(session); + VirtualHost vhost = getVirtualHost(session); - - final AMQQueue queue = queueRegistry.getQueue(queueName); + final AMQQueue queue = vhost.getQueue(queueName); if(queue == null) { @@ -929,7 +926,6 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) { @@ -947,7 +943,7 @@ public class ServerSessionDelegate extends SessionDelegate { method.setBindingKey(method.getQueue()); } - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + AMQQueue queue = virtualHost.getQueue(method.getQueue()); Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { @@ -991,7 +987,6 @@ public class ServerSessionDelegate extends SessionDelegate public void exchangeUnbind(Session session, ExchangeUnbind method) { VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) { @@ -1007,7 +1002,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + AMQQueue queue = virtualHost.getQueue(method.getQueue()); Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { @@ -1174,158 +1169,137 @@ public class ServerSessionDelegate extends SessionDelegate private AMQQueue getQueue(Session session, String queue) { - QueueRegistry queueRegistry = getQueueRegistry(session); - return queueRegistry.getQueue(queue); - } - - private QueueRegistry getQueueRegistry(Session session) - { - return getVirtualHost(session).getQueueRegistry(); + return getVirtualHost(session).getQueue(queue); } @Override public void queueDeclare(Session session, final QueueDeclare method) { - VirtualHost virtualHost = getVirtualHost(session); + final VirtualHost virtualHost = getVirtualHost(session); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); String queueName = method.getQueue(); AMQQueue queue; - QueueRegistry queueRegistry = getQueueRegistry(session); //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - synchronized (queueRegistry) + final boolean exclusive = method.getExclusive(); + final boolean autoDelete = method.getAutoDelete(); + + if(method.getPassive()) { + queue = virtualHost.getQueue(queueName); - if (((queue = queueRegistry.getQueue(queueName)) == null)) + if (queue == null) { + String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; - if (method.getPassive()) - { - String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; + exception(session, method, errorCode, description); - exception(session, method, errorCode, description); + } + else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + { + String description = "Cannot declare queue('" + queueName + "')," + + " as exclusive queue with same name " + + "declared on another session"; + ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; - return; - } - else + exception(session, method, errorCode, description); + + } + } + else + { + + try + { + + String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null; + final String alternateExchangeName = method.getAlternateExchange(); + + + final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(method.getArguments()); + + if(alternateExchangeName != null && alternateExchangeName.length() != 0) { - try - { - queue = createQueue(queueName, method, virtualHost, (ServerSession)session); - if(!method.getExclusive() && method.getAutoDelete()) - { - queue.setDeleteOnNoConsumers(true); - } + arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName); + } - final String alternateExchangeName = method.getAlternateExchange(); - if(alternateExchangeName != null && alternateExchangeName.length() != 0) - { - Exchange alternate = getExchange(session, alternateExchangeName); - queue.setAlternateExchange(alternate); - } + final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); - if(method.hasArguments() && method.getArguments() != null) - { - if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL)) - { - Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL); - queue.setNoLocal(convertBooleanValue(noLocal)); - } - } + final boolean deleteOnNoConsumer = !exclusive && autoDelete; + queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner, + autoDelete, exclusive, deleteOnNoConsumer, + arguments); - if (queue.isDurable() && !queue.isAutoDelete()) + if (autoDelete && exclusive) + { + final AMQQueue q = queue; + final ServerSession.Task deleteQueueTask = new ServerSession.Task() { - if(method.hasArguments() && method.getArguments() != null) + public void doTask(ServerSession session) { - Map<String,Object> args = method.getArguments(); - FieldTable ftArgs = new FieldTable(); - for(Map.Entry<String, Object> entry : args.entrySet()) + try + { + virtualHost.removeQueue(q); + } + catch (AMQException e) { - ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); + exception(session, method, e, "Cannot delete '" + method.getQueue()); } - DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs); } - else + }; + final ServerSession s = (ServerSession) session; + s.addSessionCloseTask(deleteQueueTask); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException { - DurableConfigurationStoreHelper.createQueue(store, queue, null); + s.removeSessionCloseTask(deleteQueueTask); } - } - queueRegistry.registerQueue(queue); - - if (method.hasAutoDelete() - && method.getAutoDelete() - && method.hasExclusive() - && method.getExclusive()) + }); + } + if (exclusive) + { + final AMQQueue q = queue; + final ServerSession.Task removeExclusive = new ServerSession.Task() + { + public void doTask(ServerSession session) { - final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - try - { - q.delete(); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete '" + method.getQueue()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + q.setAuthorizationHolder(null); + q.setExclusiveOwningSession(null); } - if (method.hasExclusive() - && method.getExclusive()) + }; + final ServerSession s = (ServerSession) session; + q.setExclusiveOwningSession(s); + s.addSessionCloseTask(removeExclusive); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException { - final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(removeExclusive); - } - }); + s.removeSessionCloseTask(removeExclusive); } - } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare queue '" + queueName); - } + }); } } - else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + catch(QueueExistsException qe) { + queue = qe.getExistingQueue(); + if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " + "declared on another session"; ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; exception(session, method, errorCode, description); - - return; + } + } + catch (AMQException e) + { + exception(session, method, e, "Cannot declare queue '" + queueName); } } } @@ -1354,20 +1328,6 @@ public class ServerSessionDelegate extends SessionDelegate return false; } - protected AMQQueue createQueue(final String queueName, - final QueueDeclare body, - VirtualHost virtualHost, - final ServerSession session) - throws AMQException - { - String owner = body.getExclusive() ? session.getClientID() : null; - - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner, - body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); - - return queue; - } - @Override public void queueDelete(Session session, QueueDelete method) { @@ -1412,12 +1372,7 @@ public class ServerSessionDelegate extends SessionDelegate try { - queue.delete(); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - DurableConfigurationStoreHelper.removeQueue(store,queue); - } + virtualHost.removeQueue(queue); } catch (AMQException e) { @@ -1471,7 +1426,14 @@ public class ServerSessionDelegate extends SessionDelegate result.setDurable(queue.isDurable()); result.setExclusive(queue.isExclusive()); result.setAutoDelete(queue.isAutoDelete()); - result.setArguments(queue.getArguments()); + Map<String, Object> arguments = new LinkedHashMap<String, Object>(); + Collection<String> availableAttrs = queue.getAvailableAttributes(); + + for(String attrName : availableAttrs) + { + arguments.put(attrName, queue.getAttribute(attrName)); + } + result.setArguments(QueueArgumentsConverter.convertModelArgsToWire(arguments)); result.setMessageCount(queue.getMessageCount()); result.setSubscriberCount(queue.getConsumerCount()); @@ -1491,7 +1453,7 @@ public class ServerSessionDelegate extends SessionDelegate if(sub == null) { - exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '" + destination + "'"); } else if(sub.isStopped()) { diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index c6bceb6ac7..63582702cb 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -21,9 +21,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; @@ -33,6 +30,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.message.InboundMessage; @@ -40,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.InboundMessageAdapter; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -65,7 +64,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -169,9 +167,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _queue = queue; - Map<String, Object> arguments = queue.getArguments(); - _traceExclude = (String) arguments.get("qpid.trace.exclude"); - _trace = (String) arguments.get("qpid.trace.id"); + _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); + _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); String filterLogString = null; _logActor = GenericActor.getInstance(this); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index 6577efe292..4e620327c9 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -73,7 +73,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic " args:" + body.getArguments()); } - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern()); + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); if (queue == null) { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 8883422989..5238a41e49 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -74,7 +74,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB else { channel.sync(); - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString()); if (queue == null) { _log.info("No queue for '" + body.getQueue() + "'"); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index 85f0a6fd3d..ba5692fc6c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -70,7 +70,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MethodRegistry methodRegistry = session.getMethodRegistry(); final AMQChannel channel = session.getChannel(channelId); @@ -115,7 +114,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo else { - AMQQueue queue = queueRegistry.getQueue(queueName); + AMQQueue queue = virtualHost.getQueue(queueName.toString()); if (queue == null) { @@ -141,7 +140,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo } else if (queueName != null) { - AMQQueue queue = queueRegistry.getQueue(queueName); + AMQQueue queue = virtualHost.getQueue(queueName.toString()); if (queue == null) { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index a8e4e38422..359bd2eb19 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -62,7 +62,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) @@ -73,7 +72,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> final AMQQueue queue; final AMQShortString routingKey; - if (body.getQueue() == null) + final AMQShortString queueName = body.getQueue(); + + if (queueName == null) { queue = channel.getDefaultQueue(); @@ -94,13 +95,13 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(queueName.toString()); routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); } if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); } final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); final Exchange exch = virtualHost.getExchange(exchangeName); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 9f887d881d..fd547d4bac 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; @@ -44,6 +45,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -61,8 +63,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); final AMQSessionModel session = protocolConnection.getChannel(channelId); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); final AMQShortString queueName; @@ -87,97 +87,103 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar throw body.getChannelNotFoundException(channelId); } - synchronized (queueRegistry) + if(body.getPassive()) { - queue = queueRegistry.getQueue(queueName); - - AMQSessionModel owningSession = null; - - if (queue != null) + queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) { - owningSession = queue.getExclusiveOwningSession(); + String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } - - if (queue == null) + else { - if (body.getPassive()) + AMQSessionModel owningSession = queue.getExclusiveOwningSession(); + if (queue.isExclusive() && !queue.isDurable() + && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) { - String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); } - else + + //set this as the default queue on the channel: + channel.setDefaultQueue(queue); + } + } + else + { + + try + { + + queue = createQueue(queueName, body, virtualHost, protocolConnection); + queue.setAuthorizationHolder(protocolConnection); + + if (body.getExclusive()) { - queue = createQueue(queueName, body, virtualHost, protocolConnection); + queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); queue.setAuthorizationHolder(protocolConnection); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStoreHelper.createQueue(store, queue, body.getArguments()); - } - if(body.getAutoDelete()) - { - queue.setDeleteOnNoConsumers(true); - } - queueRegistry.registerQueue(queue); - if (body.getExclusive()) - { - queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); - queue.setAuthorizationHolder(protocolConnection); - if(!body.getDurable()) + if(!body.getDurable()) + { + final AMQQueue q = queue; + final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() { - final AMQQueue q = queue; - final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() + public void doTask(AMQProtocolSession session) throws AMQException { - public void doTask(AMQProtocolSession session) throws AMQException - { - q.setExclusiveOwningSession(null); - } - }; - protocolConnection.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new AMQQueue.Task() { - public void doTask(AMQQueue queue) throws AMQException - { - protocolConnection.removeSessionCloseTask(sessionCloseTask); - } - }); - } + q.setExclusiveOwningSession(null); + } + }; + protocolConnection.addSessionCloseTask(sessionCloseTask); + queue.addQueueDeleteTask(new AMQQueue.Task() { + public void doTask(AMQQueue queue) throws AMQException + { + protocolConnection.removeSessionCloseTask(sessionCloseTask); + } + }); } } - } - else if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); - } - else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive())) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: " - + queue.isExclusive() + " requested " + body.getExclusive() + ")"); } - else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection))) + catch(QueueExistsException qe) { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " - + "as exclusive queue with same name " - + "declared on another client ID('" - + queue.getOwner() + "') your clientID('" + session.getClientID() + "')"); - } - else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete()) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: " - + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); - } - else if(!body.getPassive() && queue.isDurable() != body.getDurable()) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: " - + queue.isDurable() + " requested " + body.getDurable() + ")"); - } + queue = qe.getExistingQueue(); + AMQSessionModel owningSession = queue.getExclusiveOwningSession(); + + if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + } + else if(queue.isExclusive() != body.getExclusive()) + { + + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: " + + queue.isExclusive() + " requested " + body.getExclusive() + ")"); + } + else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection))) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " + + "as exclusive queue with same name " + + "declared on another client ID('" + + queue.getOwner() + "') your clientID('" + session.getClientID() + "')"); + } + else if(queue.isAutoDelete() != body.getAutoDelete()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: " + + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); + } + else if(queue.isDurable() != body.getDurable()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: " + + queue.isDurable() + " requested " + body.getDurable() + ")"); + } + } //set this as the default queue on the channel: channel.setDefaultQueue(queue); @@ -204,30 +210,35 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar protected AMQQueue createQueue(final AMQShortString queueName, QueueDeclareBody body, - VirtualHost virtualHost, + final VirtualHost virtualHost, final AMQProtocolSession session) throws AMQException { - final QueueRegistry registry = virtualHost.getQueueRegistry(); - String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null; - Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments()); + final boolean durable = body.getDurable(); + final boolean autoDelete = body.getAutoDelete(); + final boolean exclusive = body.getExclusive(); + + String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null; + + Map<String, Object> arguments = + QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); String queueNameString = AMQShortString.toString(queueName); + final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()); - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()), - queueNameString, body.getDurable(), owner, body.getAutoDelete(), - body.getExclusive(),virtualHost, arguments); + final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete, + exclusive, autoDelete, arguments); - if (body.getExclusive() && !body.getDurable()) + if (exclusive && !durable) { final AMQProtocolSession.Task deleteQueueTask = new AMQProtocolSession.Task() { public void doTask(AMQProtocolSession session) throws AMQException { - if (registry.getQueue(queueName) == queue) + if (virtualHost.getQueue(queueName.toString()) == queue) { - queue.delete(); + virtualHost.removeQueue(queue); } } }; diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index 6f5e0ea992..a39faf2e70 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -62,7 +62,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); @@ -82,7 +81,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(body.getQueue().toString()); } if (queue == null) @@ -112,12 +111,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); } - int purged = queue.delete(); - - if (queue.isDurable()) - { - DurableConfigurationStoreHelper.removeQueue(store, queue); - } + int purged = virtualHost.removeQueue(queue); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java index e925eb7455..ff845d3c16 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java @@ -60,7 +60,6 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) @@ -84,7 +83,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(body.getQueue().toString()); } if(queue == null) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index aad5446cb5..20405b82ab 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -59,8 +59,6 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final AMQQueue queue; final AMQShortString routingKey; @@ -87,7 +85,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(body.getQueue().toString()); routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index ca67b6f79b..35f24afbce 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -110,10 +110,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(destination instanceof QueueDestination) { queue = ((QueueDestination) _destination).getQueue(); - if(queue.getArguments() != null && queue.getArguments().containsKey("topic")) + + if(queue.getAvailableAttributes().contains("topic")) { source.setDistributionMode(StdDistMode.COPY); } + qd = (QueueDestination) destination; Map<Symbol,Filter> filters = source.getFilter(); @@ -194,19 +196,19 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS name = UUID.randomUUID().toString(); } - queue = _vhost.getQueueRegistry().getQueue(name); + queue = _vhost.getQueue(name); Exchange exchange = exchangeDestination.getExchange(); if(queue == null) { - queue = AMQQueueFactory.createAMQQueueImpl( + queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(name, _vhost.getName()), name, isDurable, null, true, true, - _vhost, + true, Collections.EMPTY_MAP); } else @@ -309,11 +311,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { public void doTask(Connection_1_0 session) { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) + if (_vhost.getQueue(queueName) == tempQueue) { try { - tempQueue.delete(); + _vhost.removeQueue(tempQueue); } catch (AMQException e) { @@ -417,7 +419,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { try { - queue.delete(); + queue.getVirtualHost().removeQueue(queue); } catch(AMQException e) { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ed75a8c165..d3962c779c 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -107,7 +107,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + AMQQueue queue = _vhost.getQueue(addr); if(queue != null) { @@ -256,7 +256,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } else { - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + AMQQueue queue = _vhost.getQueue(addr); if(queue != null) { @@ -329,14 +329,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); - final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - _vhost, - properties); + final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), + queueName, + false, // durable + null, // owner + false, // autodelete + false, // exclusive + false, + properties); @@ -347,11 +347,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu { public void doTask(Connection_1_0 session) { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) + if (_vhost.getQueue(queueName) == tempQueue) { try { - tempQueue.delete(); + _vhost.removeQueue(tempQueue); } catch (AMQException e) { diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java index 67ac1bdc7c..2c88f83405 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; @MBeanDescription("This MBean exposes the broker level management features") public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<VirtualHost> implements ManagedBroker @@ -180,7 +181,8 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi throws IOException, JMException { final Map<String, Object> createArgs = processNewQueueArguments(queueName, owner, originalArguments); - getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, createArgs); + getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, + QueueArgumentsConverter.convertWireArgsToModel(createArgs)); } @@ -196,11 +198,11 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi if (_moveNonExclusiveQueueOwnerToDescription && owner != null) { argumentsCopy = new HashMap<String, Object>(arguments == null ? new HashMap<String, Object>() : arguments); - if (!argumentsCopy.containsKey(AMQQueueFactory.X_QPID_DESCRIPTION)) + if (!argumentsCopy.containsKey(QueueArgumentsConverter.X_QPID_DESCRIPTION)) { - LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + QueueArgumentsConverter.X_QPID_DESCRIPTION); - argumentsCopy.put(AMQQueueFactory.X_QPID_DESCRIPTION, owner); + argumentsCopy.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, owner); } else { diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java index e3fac9f711..4240dd5280 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; public class VirtualHostManagerMBeanTest extends TestCase { @@ -79,16 +80,16 @@ public class VirtualHostManagerMBeanTest extends TestCase { _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true); - Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_OWNER); + Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER); verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); } public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception { - Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments); - Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION); verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 53dd6df599..631490ab5f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -167,11 +167,6 @@ public abstract class AbstractExchange implements Exchange return _virtualHost; } - public QueueRegistry getQueueRegistry() - { - return getVirtualHost().getQueueRegistry(); - } - public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue) { return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 2873eb31e8..8e9f980e6b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -47,6 +47,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchange implements Exchange { + private final QueueRegistry _queueRegistry; private UUID _id; private VirtualHost _virtualHost; private static final Logger _logger = Logger.getLogger(DefaultExchange.class); @@ -55,6 +56,11 @@ public class DefaultExchange implements Exchange private LogSubject _logSubject; private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); + public DefaultExchange(QueueRegistry queueRegistry) + { + _queueRegistry = queueRegistry; + } + @Override public void initialise(UUID id, @@ -82,7 +88,7 @@ public class DefaultExchange implements Exchange @Override public long getBindingCount() { - return _virtualHost.getQueueRegistry().getQueues().size(); + return _virtualHost.getQueues().size(); } @Override @@ -146,7 +152,7 @@ public class DefaultExchange implements Exchange @Override public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) { - if(_virtualHost.getQueueRegistry().getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty())) + if(_virtualHost.getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty())) { return convertToBinding(queue); } @@ -207,7 +213,7 @@ public class DefaultExchange implements Exchange @Override public List<AMQQueue> route(InboundMessage message) { - AMQQueue q = _virtualHost.getQueueRegistry().getQueue(message.getRoutingKey()); + AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) { List<AMQQueue> noQueues = Collections.emptyList(); @@ -235,13 +241,13 @@ public class DefaultExchange implements Exchange @Override public boolean isBound(AMQShortString routingKey) { - return _virtualHost.getQueueRegistry().getQueue(routingKey) != null; + return _virtualHost.getQueue(routingKey == null ? null : routingKey.toString()) != null; } @Override public boolean isBound(AMQQueue queue) { - return _virtualHost.getQueueRegistry().getQueue(queue.getName()) == queue; + return _virtualHost.getQueue(queue.getName()) == queue; } @Override @@ -283,7 +289,7 @@ public class DefaultExchange implements Exchange @Override public boolean isBound(String bindingKey) { - return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null; + return _virtualHost.getQueue(bindingKey) != null; } @Override @@ -320,7 +326,7 @@ public class DefaultExchange implements Exchange public Collection<Binding> getBindings() { List<Binding> bindings = new ArrayList<Binding>(); - for(AMQQueue q : _virtualHost.getQueueRegistry().getQueues()) + for(AMQQueue q : _virtualHost.getQueues()) { bindings.add(convertToBinding(q)); } @@ -330,7 +336,7 @@ public class DefaultExchange implements Exchange @Override public void addBindingListener(BindingListener listener) { - _virtualHost.getQueueRegistry().addRegistryChangeListener(convertListener(listener));//To change body of implemented methods use File | Settings | File Templates. + _queueRegistry.addRegistryChangeListener(convertListener(listener)); } private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 75c489c731..d8263a3c80 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -27,6 +27,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -40,20 +41,23 @@ import java.util.concurrent.ConcurrentMap; public class DefaultExchangeRegistry implements ExchangeRegistry { private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class); - /** * Maps from exchange name to exchange instance */ private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>(); private Exchange _defaultExchange; - private VirtualHost _host; + + private final VirtualHost _host; + private final QueueRegistry _queueRegistry; + private final Collection<RegistryChangeListener> _listeners = Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>()); - public DefaultExchangeRegistry(VirtualHost host) + public DefaultExchangeRegistry(VirtualHost host, QueueRegistry queueRegistry) { _host = host; + _queueRegistry = queueRegistry; } public void initialise(ExchangeFactory exchangeFactory) throws AMQException @@ -61,7 +65,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry //create 'standard' exchanges: new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore()); - _defaultExchange = new DefaultExchange(); + _defaultExchange = new DefaultExchange(_queueRegistry); UUID defaultExchangeId = UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java index 6fe0607ab2..ae2031bd71 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java @@ -89,6 +89,7 @@ public interface Queue extends ConfiguredObject public static final String EXCLUSIVE = "exclusive"; public static final String MESSAGE_GROUP_KEY = "messageGroupKey"; public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; + public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; public static final String LVQ_KEY = "lvqKey"; public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; public static final String NO_LOCAL = "noLocal"; @@ -100,6 +101,10 @@ public interface Queue extends ConfiguredObject public static final String TYPE = "type"; public static final String PRIORITIES = "priorities"; + public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change + + public static final String FEDERATION_EXCLUDES = "federationExcludes"; + public static final String FEDERATION_ID = "federationId"; public static final Collection<String> AVAILABLE_ATTRIBUTES = @@ -134,6 +139,7 @@ public interface Queue extends ConfiguredObject PRIORITIES )); + //children Collection<Binding> getBindings(); Collection<Consumer> getConsumers(); @@ -144,6 +150,6 @@ public interface Queue extends ConfiguredObject void visit(QueueEntryVisitor visitor); void delete(); - + void setNotificationListener(QueueNotificationListener listener); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index a84a041b72..26ac99d5bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -120,7 +120,7 @@ public interface VirtualHost extends ConfiguredObject QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, CONFIG_PATH)); - int CURRENT_CONFIG_VERSION = 2; + int CURRENT_CONFIG_VERSION = 3; //children Collection<VirtualHostAlias> getAliases(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 157b97cc07..96a7eacb92 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -66,25 +66,6 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs put(DESCRIPTION, String.class); }}); - static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>(); - static - { - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH); - - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT); - - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY); - - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.PRIORITIES, AMQQueueFactory.X_QPID_PRIORITIES); - } - private final AMQQueue _queue; private final Map<Binding, BindingAdapter> _bindingAdapters = new HashMap<Binding, BindingAdapter>(); @@ -190,15 +171,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { try { - QueueRegistry queueRegistry = _queue.getVirtualHost().getQueueRegistry(); - synchronized(queueRegistry) - { - _queue.delete(); - if (_queue.isDurable()) - { - DurableConfigurationStoreHelper.removeQueue(_queue.getVirtualHost().getDurableConfigurationStore(), _queue); - } - } + _queue.getVirtualHost().removeQueue(_queue); } catch(AMQException e) { @@ -414,13 +387,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } else if(MESSAGE_GROUP_KEY.equals(name)) { - return _queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY); + return _queue.getAttribute(MESSAGE_GROUP_KEY); } else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name)) { //We only return the boolean value if message groups are actually in use - return getAttribute(MESSAGE_GROUP_KEY) == null ? null : - SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(_queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP)); + return getAttribute(MESSAGE_GROUP_KEY) == null ? null : _queue.getAttribute(MESSAGE_GROUP_SHARED_GROUPS); } else if(LVQ_KEY.equals(name)) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index c09dd9449e..977fd5ae56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -41,12 +41,9 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.SystemConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration; -import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -68,6 +65,7 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -86,6 +84,7 @@ import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHostListener; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener { @@ -203,7 +202,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual private void populateQueues() { - Collection<AMQQueue> actualQueues = _virtualHost.getQueueRegistry().getQueues(); + Collection<AMQQueue> actualQueues = _virtualHost.getQueues(); if ( actualQueues != null ) { synchronized(_queueAdapters) @@ -399,7 +398,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null) { - attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LVQ_KEY); + attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); } else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null) { @@ -415,7 +414,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { String key = MapValueConverter.getStringAttribute(Queue.MESSAGE_GROUP_KEY, attributes); attributes.remove(Queue.MESSAGE_GROUP_KEY); - attributes.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, key); + attributes.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, key); } if (attributes.containsKey(Queue.MESSAGE_GROUP_SHARED_GROUPS)) @@ -423,7 +422,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual if(MapValueConverter.getBooleanAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS, attributes)) { attributes.remove(Queue.MESSAGE_GROUP_SHARED_GROUPS); - attributes.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + attributes.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); } } @@ -440,15 +439,6 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual attributes.remove(Queue.LIFETIME_POLICY); attributes.remove(Queue.TIME_TO_LIVE); - List<String> attrNames = new ArrayList<String>(attributes.keySet()); - for(String attr : attrNames) - { - if(QueueAdapter.ATTRIBUTE_MAPPINGS.containsKey(attr)) - { - attributes.put(QueueAdapter.ATTRIBUTE_MAPPINGS.get(attr),attributes.remove(attr)); - } - } - return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes); } @@ -472,33 +462,26 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual owner = authenticatedPrincipal.getName(); } } + + final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE; + try { - QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); - synchronized (queueRegistry) - { - if(_virtualHost.getQueueRegistry().getQueue(name)!=null) - { - throw new IllegalArgumentException("Queue with name "+name+" already exists"); - } - AMQQueue queue = - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, - durable, owner, lifetime == LifetimePolicy.AUTO_DELETE, - exclusive, _virtualHost, attributes); - if(durable) - { - DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), - queue, - FieldTable.convertToFieldTable(attributes)); - } - synchronized (_queueAdapters) - { - return _queueAdapters.get(queue); - } + AMQQueue queue = + _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, + durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes); + + synchronized (_queueAdapters) + { + return _queueAdapters.get(queue); } } + catch(QueueExistsException qe) + { + throw new IllegalArgumentException("Queue with name "+name+" already exists"); + } catch(AMQException e) { throw new IllegalArgumentException(e); @@ -1057,7 +1040,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { if(VirtualHost.QUEUE_COUNT.equals(name)) { - return _vhost.getQueueRegistry().getQueues().size(); + return _vhost.getQueues().size(); } else if(VirtualHost.EXCHANGE_COUNT.equals(name)) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 4f610cc925..cb6a9249d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -225,7 +225,8 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa void setAlternateExchange(Exchange exchange); - Map<String, Object> getArguments(); + Collection<String> getAvailableAttributes(); + Object getAttribute(String attrName); void checkCapacity(AMQSessionModel channel); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 1eeb6dccf3..5001c2fd2b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -29,42 +29,31 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.VirtualHost; -public class AMQQueueFactory +public class AMQQueueFactory implements QueueFactory { - public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity"; - public static final String X_QPID_CAPACITY = "x-qpid-capacity"; - public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap"; - public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count"; - public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size"; - public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age"; - public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth"; - - public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; - public static final String X_QPID_DESCRIPTION = "x-qpid-description"; - public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; - public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; - public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; - public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; + public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + - public static final String DLQ_ROUTING_KEY = "dlq"; - public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; - public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; + public static final String DLQ_ROUTING_KEY = "dlq"; + + private final VirtualHost _virtualHost; + private final QueueRegistry _queueRegistry; - private AMQQueueFactory() + public AMQQueueFactory(VirtualHost virtualHost, QueueRegistry queueRegistry) { + _virtualHost = virtualHost; + _queueRegistry = queueRegistry; } private abstract static class QueueProperty @@ -129,56 +118,56 @@ public class AMQQueueFactory } private static final QueueProperty[] DECLAREABLE_PROPERTIES = { - new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE) + new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageAge(value); } }, - new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE) + new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageSize(value); } }, - new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT) + new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageCount(value); } }, - new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH) + new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumQueueDepth(value); } }, - new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP) + new QueueLongProperty(Queue.ALERT_REPEAT_GAP) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMinimumAlertRepeatGap(value); } }, - new QueueLongProperty(X_QPID_CAPACITY) + new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setCapacity(value); } }, - new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY) + new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setFlowResumeCapacity(value); } }, - new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT) + new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS) { public void setPropertyValue(AMQQueue queue, int value) { @@ -189,13 +178,17 @@ public class AMQQueueFactory /** * @param id the id to use. + * @param deleteOnNoConsumer */ - public static AMQQueue createAMQQueueImpl(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException + @Override + public AMQQueue createAMQQueueImpl(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException { if (id == null) { @@ -206,16 +199,11 @@ public class AMQQueueFactory throw new IllegalArgumentException("Queue name must not be null"); } - // Access check - if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner)) - { - String description = "Permission denied: queue-name '" + queueName + "'"; - throw new AMQSecurityException(description); - } - QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName); - boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration); - if (isDLQEnabled) + QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName); + + boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration); + if (createDLQ) { validateDLNames(queueName); } @@ -226,17 +214,17 @@ public class AMQQueueFactory if(arguments != null) { - if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) + if(arguments.containsKey(Queue.LVQ_KEY)) { - conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY); + conflationKey = (String) arguments.get(Queue.LVQ_KEY); if(conflationKey == null) { - conflationKey = QPID_LVQ_KEY; + conflationKey = QPID_DEFAULT_LVQ_KEY; } } - else if(arguments.containsKey(X_QPID_PRIORITIES)) + else if(arguments.containsKey(Queue.PRIORITIES)) { - Object prioritiesObj = arguments.get(X_QPID_PRIORITIES); + Object prioritiesObj = arguments.get(Queue.PRIORITIES); if(prioritiesObj instanceof Number) { priorities = ((Number)prioritiesObj).intValue(); @@ -257,33 +245,36 @@ public class AMQQueueFactory // TODO - should warn here of invalid format } } - else if(arguments.containsKey(QPID_QUEUE_SORT_KEY)) + else if(arguments.containsKey(Queue.SORT_KEY)) { - sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY); + sortingKey = (String)arguments.get(Queue.SORT_KEY); } } AMQQueue q; if(sortingKey != null) { - q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); + q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey); } else if(conflationKey != null) { - q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); + q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); + q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); } else { - q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); + q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); } + q.setDeleteOnNoConsumers(deleteOnNoConsumer); + //Register the new queue - virtualHost.getQueueRegistry().registerQueue(q); - q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName)); + _queueRegistry.registerQueue(q); + + q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName)); if(arguments != null) { @@ -294,21 +285,25 @@ public class AMQQueueFactory p.setPropertyValue(q, arguments.get(p.getArgumentName().toString())); } } + + if(arguments.get(Queue.NO_LOCAL) instanceof Boolean) + { + q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL)); + } + } - if(isDLQEnabled) + if(createDLQ) { final String dlExchangeName = getDeadLetterExchangeName(queueName); final String dlQueueName = getDeadLetterQueueName(queueName); - final QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - Exchange dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()); + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); try { - dlExchange = virtualHost.createExchange(dlExchangeId, + dlExchange = _virtualHost.createExchange(dlExchangeId, dlExchangeName, ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(), true, false, null); @@ -321,23 +316,19 @@ public class AMQQueueFactory AMQQueue dlQueue = null; - synchronized(queueRegistry) + synchronized(_queueRegistry) { - dlQueue = queueRegistry.getQueue(dlQueueName); + dlQueue = _queueRegistry.getQueue(dlQueueName); if(dlQueue == null) { //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc final Map<String, Object> args = new HashMap<String, Object>(); - args.put(X_QPID_DLQ_ENABLED, false); - args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0); - - dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); - //enter the dlq in the persistent store - DurableConfigurationStoreHelper.createQueue(virtualHost.getDurableConfigurationStore(), - dlQueue, - FieldTable.convertToFieldTable(args)); + dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive, + false, args); } } @@ -350,11 +341,31 @@ public class AMQQueueFactory } q.setAlternateExchange(dlExchange); } + else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + { + + final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE); + Exchange altExchange; + try + { + altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); + } + catch(IllegalArgumentException e) + { + altExchange = _virtualHost.getExchange(altExchangeAttr); + } + q.setAlternateExchange(altExchange); + } + + if (q.isDurable() && !q.isAutoDelete()) + { + DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q); + } return q; } - public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException + public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws AMQException { String queueName = config.getName(); @@ -365,9 +376,9 @@ public class AMQQueueFactory Map<String, Object> arguments = createQueueArgumentsFromConfig(config); // we need queues that are defined in config to have deterministic ids. - UUID id = UUIDGenerator.generateQueueUUID(queueName, host.getName()); + UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName()); - AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments); + AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, false, arguments); q.configure(config); return q; } @@ -414,21 +425,23 @@ public class AMQQueueFactory * queue configuration * @return true if DLQ enabled */ - protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) { //feature is not to be enabled for temporary queues or when explicitly disabled by argument - if (!autoDelete) + if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) { - boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED); + boolean dlqArgumentPresent = arguments != null + && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled()) { boolean dlqEnabled = true; if (dlqArgumentPresent) { - Object argument = arguments.get(X_QPID_DLQ_ENABLED); - dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue(); + Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); + dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) + || (argument instanceof String && Boolean.parseBoolean(argument.toString())); } - return dlqEnabled; + return dlqEnabled ; } } return false; @@ -464,31 +477,30 @@ public class AMQQueueFactory if(config.getArguments() != null && !config.getArguments().isEmpty()) { - arguments.putAll(config.getArguments()); + arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments()))); } if(config.isLVQ() || config.getLVQKey() != null) { - arguments.put(QPID_LAST_VALUE_QUEUE, 1); - arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); + arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey()); } else if (config.getPriority() || config.getPriorities() > 0) { - arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); } else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) { - arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey()); + arguments.put(Queue.SORT_KEY, config.getQueueSortKey()); } if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) { - arguments.put(X_QPID_DLQ_ENABLED, true); + arguments.put(Queue.CREATE_DLQ_ON_CREATION, true); } if (config.getDescription() != null && !"".equals(config.getDescription())) { - arguments.put(X_QPID_DESCRIPTION, config.getDescription()); + arguments.put(Queue.DESCRIPTION, config.getDescription()); } if (arguments.isEmpty()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 27a9e13617..7308433759 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -59,8 +59,9 @@ public class DefaultQueueRegistry implements QueueRegistry } } - public void unregisterQueue(AMQShortString name) + public void unregisterQueue(String nameString) { + AMQShortString name = new AMQShortString(nameString); AMQQueue q = _queueMap.remove(name); if(q != null) { @@ -74,16 +75,11 @@ public class DefaultQueueRegistry implements QueueRegistry } } - public AMQQueue getQueue(AMQShortString name) + private AMQQueue getQueue(AMQShortString name) { return _queueMap.get(name); } - public Collection<AMQShortString> getQueueNames() - { - return _queueMap.keySet(); - } - public Collection<AMQQueue> getQueues() { return _queueMap.values(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java new file mode 100644 index 0000000000..f5bee850c2 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.qpid.server.model.Queue; + +public class QueueArgumentsConverter +{ + public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity"; + public static final String X_QPID_CAPACITY = "x-qpid-capacity"; + public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap"; + public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count"; + public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size"; + public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age"; + public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth"; + + public static final String QPID_ALERT_COUNT = "qpid.alert_count"; + public static final String QPID_ALERT_SIZE = "qpid.alert_size"; + public static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap"; + + public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; + + public static final String X_QPID_DESCRIPTION = "x-qpid-description"; + /* public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; + public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + */ + public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; + + public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; + public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; + public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; + public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; + public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; + public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; + public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude"; + public static final String QPID_TRACE_ID = "qpid.trace.id"; + + public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + + /** + * No-local queue argument is used to support the no-local feature of Durable Subscribers. + */ + public static final String QPID_NO_LOCAL = "no-local"; + static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>(); + static + { + ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_AGE, Queue.ALERT_THRESHOLD_MESSAGE_AGE); + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_SIZE, Queue.ALERT_THRESHOLD_MESSAGE_SIZE); + + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_QUEUE_DEPTH, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + ATTRIBUTE_MAPPINGS.put(QPID_ALERT_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + ATTRIBUTE_MAPPINGS.put(QPID_ALERT_SIZE, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + ATTRIBUTE_MAPPINGS.put(QPID_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); + + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_DELIVERY_COUNT, Queue.MAXIMUM_DELIVERY_ATTEMPTS); + + ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); + ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); + + ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, Queue.SORT_KEY); + ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, Queue.LVQ_KEY); + ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, Queue.PRIORITIES); + + ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); + + ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, Queue.CREATE_DLQ_ON_CREATION); + ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY); + //ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS); + ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP); + ATTRIBUTE_MAPPINGS.put(QPID_TRACE_EXCLUDE, Queue.FEDERATION_EXCLUDES); + ATTRIBUTE_MAPPINGS.put(QPID_TRACE_ID, Queue.FEDERATION_ID); + ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); + + } + + + public static Map<String,Object> convertWireArgsToModel(Map<String,Object> wireArguments) + { + Map<String,Object> modelArguments = new HashMap<String, Object>(); + if(wireArguments != null) + { + for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet()) + { + if(wireArguments.containsKey(entry.getKey())) + { + modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey())); + } + } + if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) + { + modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); + } + if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) + { + modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, + SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(wireArguments.get(QPID_SHARED_MSG_GROUP))); + } + if(wireArguments.get(X_QPID_DLQ_ENABLED) != null) + { + modelArguments.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString())); + } + + if(wireArguments.get(QPID_NO_LOCAL) != null) + { + modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString())); + } + + } + return modelArguments; + } + + + public static Map<String,Object> convertModelArgsToWire(Map<String,Object> modelArguments) + { + Map<String,Object> wireArguments = new HashMap<String, Object>(); + for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet()) + { + if(modelArguments.containsKey(entry.getValue())) + { + wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue())); + } + } + + if(Boolean.TRUE.equals(modelArguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + { + wireArguments.put(QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + } + + return wireArguments; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java new file mode 100644 index 0000000000..5411a2bc9c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; + +public interface QueueFactory +{ + AMQQueue createAMQQueueImpl(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index e8c34128e9..bc1d5942bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -32,11 +31,7 @@ public interface QueueRegistry void registerQueue(AMQQueue queue); - void unregisterQueue(AMQShortString name); - - AMQQueue getQueue(AMQShortString name); - - Collection<AMQShortString> getQueueNames(); + void unregisterQueue(String name); Collection<AMQQueue> getQueues(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b0ab93162a..e3dbd62b6c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -51,6 +51,7 @@ import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager; @@ -68,12 +69,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; - public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; - private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; private static final String QPID_NO_GROUP = "qpid.no-group"; private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP); + // TODO - should make this configurable at the vhost / broker level private static final int DEFAULT_MAX_GROUPS = 255; @@ -237,7 +236,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _exclusive = exclusive; _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments); + _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments)); _id = id; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); @@ -255,19 +254,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes durable, !durable, _entries.getPriorities() > 0)); - if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) + if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY)) { - if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals(SHARED_MSG_GROUP_ARG_VALUE)) + if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null + && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) { - Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP_ARG); + Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), + new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else { - _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS); + _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get( + Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } else @@ -358,13 +359,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _alternateExchange = exchange; } - /** - * Arguments used to create this queue. The caller is assured - * that null will never be returned. - */ - public Map<String, Object> getArguments() + + @Override + public Collection<String> getAvailableAttributes() + { + return new ArrayList<String>(_arguments.keySet()); + } + + @Override + public Object getAttribute(String attrName) { - return _arguments; + return _arguments.get(attrName); } public boolean isAutoDelete() @@ -511,7 +516,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _logger.info("Auto-deleteing queue:" + this); } - delete(); + getVirtualHost().removeQueue(this); // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared @@ -1340,7 +1345,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - _virtualHost.getQueueRegistry().unregisterQueue(_name); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -2282,18 +2286,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { if (description == null) { - _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION); + _arguments.remove(Queue.DESCRIPTION); } else { - _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description); + _arguments.put(Queue.DESCRIPTION, description); } } @Override public String getDescription() { - return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION); + return (String) _arguments.get(Queue.DESCRIPTION); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java index 931368cb97..960986ec45 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -167,12 +167,12 @@ public class SecurityManager implements ConfigurationChangeListener { String pluginTypeName = getPluginTypeName(accessControl); _hostPlugins.put(pluginTypeName, accessControl); - + if(_logger.isDebugEnabled()) { _logger.debug("Added access control to host plugins with name: " + vhostName); } - + break; } } @@ -366,7 +366,7 @@ public class SecurityManager implements ConfigurationChangeListener } public boolean authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive, - final Boolean nowait, final Boolean passive, final AMQShortString queueName, final String owner) + final Boolean nowait, final Boolean passive, final String queueName, final String owner) { return checkAllPlugins(new AccessCheck() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java index 6c631fc360..893b371d11 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java @@ -212,7 +212,7 @@ public class ObjectProperties } public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive, - AMQShortString queueName, String owner) + String queueName, String owner) { super(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index efb1e95e99..e9181c0e12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; public class DurableConfigurationStoreHelper { @@ -46,28 +48,23 @@ public class DurableConfigurationStoreHelper attributesMap.put(Queue.NAME, queue.getName()); attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner())); attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + if (queue.getAlternateExchange() != null) { attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); } - else - { - attributesMap.remove(Queue.ALTERNATE_EXCHANGE); - } - if (attributesMap.containsKey(Queue.ARGUMENTS)) - { - // We wouldn't need this if createQueueConfiguredObject took only AMQQueue - Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS); - currentArgs.putAll(queue.getArguments()); - } - else + + Collection<String> availableAttrs = queue.getAvailableAttributes(); + + for(String attrName : availableAttrs) { - attributesMap.put(Queue.ARGUMENTS, queue.getArguments()); + attributesMap.put(attrName, queue.getAttribute(attrName)); } + store.update(queue.getId(), QUEUE, attributesMap); } - public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments) + public static void createQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException { Map<String, Object> attributesMap = new HashMap<String, Object>(); @@ -78,11 +75,9 @@ public class DurableConfigurationStoreHelper { attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); } - // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments. - // It would also do away with the need for the if/then/else within updateQueueConfiguredObject - if (arguments != null) + for(String attrName : queue.getAvailableAttributes()) { - attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments)); + attributesMap.put(attrName, queue.getAttribute(attrName)); } store.create(queue.getId(), QUEUE,attributesMap); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 4e27a008dd..d87431a415 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -58,11 +59,13 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener { @@ -95,6 +98,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private final ConnectionRegistry _connectionRegistry; private final DtxRegistry _dtxRegistry; + private final AMQQueueFactory _queueFactory; private volatile State _state = State.INITIALISING; @@ -136,11 +140,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); + _queueRegistry = new DefaultQueueRegistry(this); + _queueFactory = new AMQQueueFactory(this, _queueRegistry); + _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeRegistry = new DefaultExchangeRegistry(this); + _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry); initialiseStatistics(); @@ -298,12 +305,12 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + AMQQueue queue = _queueFactory.createAMQQueueImpl(queueConfiguration); String queueName = queue.getName(); if (queue.isDurable()) { - DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null); + DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue); } //get the exchange name (returns default exchange name if none was specified) @@ -428,12 +435,102 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } @Override + public AMQQueue getQueue(String name) + { + return _queueRegistry.getQueue(name); + } + + @Override + public AMQQueue getQueue(UUID id) + { + return _queueRegistry.getQueue(id); + } + + @Override + public Collection<AMQQueue> getQueues() + { + return _queueRegistry.getQueues(); + } + + @Override + public int removeQueue(AMQQueue queue) throws AMQException + { + synchronized (getQueueRegistry()) + { + int purged = queue.delete(); + + getQueueRegistry().unregisterQueue(queue.getName()); + if (queue.isDurable() && !queue.isAutoDelete()) + { + DurableConfigurationStore store = getDurableConfigurationStore(); + DurableConfigurationStoreHelper.removeQueue(store, queue); + } + return purged; + } + } + + @Override + public AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQException + { + // Access check + if (!getSecurityManager().authoriseCreateQueue(autoDelete, + durable, + exclusive, + null, + null, + queueName, + owner)) + { + String description = "Permission denied: queue-name '" + queueName + "'"; + throw new AMQSecurityException(description); + } + + synchronized (_queueRegistry) + { + if(_queueRegistry.getQueue(queueName) != null) + { + throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); + } + if(id == null) + { + + id = UUIDGenerator.generateExchangeUUID(queueName, getName()); + while(_queueRegistry.getQueue(id) != null) + { + id = UUID.randomUUID(); + } + + } + else if(_queueRegistry.getQueue(id) != null) + { + throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName)); + } + return _queueFactory.createAMQQueueImpl(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, + arguments); + } + + } + + @Override public Exchange getExchange(String name) { return _exchangeRegistry.getExchange(name); } @Override + public Exchange getExchange(UUID id) + { + return _exchangeRegistry.getExchange(id); + } + + @Override public Exchange getDefaultExchange() { return _exchangeRegistry.getDefaultExchange(); @@ -747,7 +844,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers() { DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(this, getExchangeRegistry()), + new QueueRecoverer(this, getExchangeRegistry(), _queueFactory), new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()), new BindingRecoverer(this, getExchangeRegistry()) }; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index 7cfadbcadf..2d3a620e91 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -91,7 +91,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B { _unresolvedDependencies.add(new ExchangeDependency()); } - _queue = _virtualHost.getQueueRegistry().getQueue(_queueId); + _queue = _virtualHost.getQueue(_queueId); if(_queue == null) { _unresolvedDependencies.add(new QueueDependency()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 3526551073..8d05e719ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; @@ -60,6 +61,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader()); case 1: currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); + case 2: + currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); + case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; @@ -213,7 +217,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider UUID queueId = UUID.fromString(queueIdString); ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())) - || _virtualHost.getQueueRegistry().getQueue(queueId) != null); + || _virtualHost.getQueue(queueId) != null); } private boolean isBinding(final String type) @@ -224,4 +228,39 @@ public class DefaultUpgraderProvider implements UpgraderProvider } + /* + * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the + * attributes into the map using the model attribute names rather than the wire attribute names + */ + private class Version2Upgrader extends NonNullUpgrader + { + + private static final String ARGUMENTS = "arguments"; + + @Override + public void configuredObject(UUID id, String type, Map<String, Object> attributes) + { + if(Queue.class.getSimpleName().equals(type)) + { + Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(); + if(attributes.get(ARGUMENTS) instanceof Map) + { + newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) attributes + .get(ARGUMENTS))); + } + newAttributes.putAll(attributes); + attributes = newAttributes; + getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes)); + } + + getNextUpgrader().configuredObject(id,type,attributes); + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 7929cd3e39..b4fbdf7544 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -20,18 +20,19 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.LinkedHashMap; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; @@ -41,11 +42,15 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ private static final Logger _logger = Logger.getLogger(QueueRecoverer.class); private final VirtualHost _virtualHost; private final ExchangeRegistry _exchangeRegistry; + private final QueueFactory _queueFactory; - public QueueRecoverer(final VirtualHost virtualHost, final ExchangeRegistry exchangeRegistry) + public QueueRecoverer(final VirtualHost virtualHost, + final ExchangeRegistry exchangeRegistry, + final QueueFactory queueFactory) { _virtualHost = virtualHost; _exchangeRegistry = exchangeRegistry; + _queueFactory = queueFactory; } @Override @@ -101,26 +106,24 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ String queueName = (String) _attributes.get(Queue.NAME); String owner = (String) _attributes.get(Queue.OWNER); boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE); - @SuppressWarnings("unchecked") - Map<String, Object> queueArgumentsMap = (Map<String, Object>) _attributes.get(Queue.ARGUMENTS); + + Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes); + queueArgumentsMap.remove(Queue.NAME); + queueArgumentsMap.remove(Queue.OWNER); + queueArgumentsMap.remove(Queue.EXCLUSIVE); + try { - _queue = _virtualHost.getQueueRegistry().getQueue(_id); + _queue = _virtualHost.getQueue(_id); if(_queue == null) { - _queue = _virtualHost.getQueueRegistry().getQueue(queueName); + _queue = _virtualHost.getQueue(queueName); } if (_queue == null) { - _queue = AMQQueueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, _virtualHost, - queueArgumentsMap); - _virtualHost.getQueueRegistry().registerQueue(_queue); - - if (_alternateExchange != null) - { - _queue.setAlternateExchange(_alternateExchange); - } + _queue = _queueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, + false, queueArgumentsMap); } } catch (AMQException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index e06e785338..2ebbedccd4 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -21,15 +21,18 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledFuture; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; @@ -45,7 +48,23 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable String getName(); - QueueRegistry getQueueRegistry(); + AMQQueue getQueue(String name); + + AMQQueue getQueue(UUID id); + + Collection<AMQQueue> getQueues(); + + int removeQueue(AMQQueue queue) throws AMQException; + + AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQException; + Exchange createExchange(UUID id, String exchange, @@ -58,6 +77,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable void removeExchange(Exchange exchange, boolean force) throws AMQException; Exchange getExchange(String name); + Exchange getExchange(UUID id); + Exchange getDefaultExchange(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 3738306f6a..39ca3197b4 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHandler implements } for(Transaction.Record record : enqueues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); + final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -179,7 +179,7 @@ public class VirtualHostConfigRecoveryHandler implements } for(Transaction.Record record : dequeues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); + final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -268,7 +268,7 @@ public class VirtualHostConfigRecoveryHandler implements public void queueEntry(final UUID queueId, long messageId) { - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); + AMQQueue queue = _virtualHost.getQueue(queueId); try { if(queue != null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java new file mode 100644 index 0000000000..54f7d0d172 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugins; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQQueue; + +public class QueueExistsException extends AMQException +{ + private final AMQQueue _existing; + + public QueueExistsException(String name, AMQQueue existing) + { + super(name); + _existing = existing; + } + + public AMQQueue getExistingQueue() + { + return _existing; + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index de6d036f29..dc9ddf7b32 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase VirtualHost vhost = createVirtualHost(getName()); // Check that atest was a priority queue with 5 priorities - AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + AMQQueue atest = vhost.getQueue("atest"); assertTrue(atest instanceof AMQPriorityQueue); assertEquals(5, ((AMQPriorityQueue) atest).getPriorities()); // Check that ptest was a priority queue with 10 priorities - AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest")); + AMQQueue ptest = vhost.getQueue("ptest"); assertTrue(ptest instanceof AMQPriorityQueue); assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities()); // Check that ntest wasn't a priority queue - AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest")); + AMQQueue ntest = vhost.getQueue("ntest"); assertFalse(ntest instanceof AMQPriorityQueue); } @@ -146,13 +146,13 @@ public class VirtualHostConfigurationTest extends QpidTestCase VirtualHost vhost = createVirtualHost(getName()); // Check specifically configured values - AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + AMQQueue aTest = vhost.getQueue("atest"); assertEquals(4, aTest.getMaximumQueueDepth()); assertEquals(5, aTest.getMaximumMessageSize()); assertEquals(6, aTest.getMaximumMessageAge()); // Check default values - AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest")); + AMQQueue bTest = vhost.getQueue("btest"); assertEquals(1, bTest.getMaximumQueueDepth()); assertEquals(2, bTest.getMaximumMessageSize()); assertEquals(3, bTest.getMaximumMessageAge()); @@ -214,10 +214,10 @@ public class VirtualHostConfigurationTest extends QpidTestCase assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled()); // Get queues - AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles")); - AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle")); - AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2")); - AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0")); + AMQQueue biggles = test.getQueue("biggles"); + AMQQueue beetle = test.getQueue("beetle"); + AMQQueue r2d2 = extra.getQueue("r2d2"); + AMQQueue c3p0 = extra.getQueue("c3p0"); // Disabled specifically for this queue, overriding virtualhost setting assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index f2a64381df..7adec3d595 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -75,7 +75,8 @@ public class TopicExchangeTest extends QpidTestCase public void testNoRoute() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -86,7 +87,8 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -108,7 +110,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); @@ -139,7 +141,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); @@ -190,7 +192,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); routeMessage("a.c.d.b",0l); @@ -215,7 +218,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchafterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); @@ -253,7 +257,8 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -274,7 +279,8 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -295,7 +301,8 @@ public class TopicExchangeTest extends QpidTestCase public void testSubMatchFails() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); int queueCount = routeMessage("a.b.c",0l); @@ -326,7 +333,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreRouting() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -339,7 +347,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreQueue() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 693fd16b9f..a468fa072b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.queue; +import java.util.Collections; import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import java.util.ArrayList; +import org.apache.qpid.server.model.Queue; import static org.mockito.Mockito.when; @@ -38,9 +38,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest @Override public void setUp() throws Exception { - FieldTable arguments = new FieldTable(); - arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3); - setArguments(arguments); + setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3)); super.setUp(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index c8e0e53d75..62c9b4c46d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,91 +20,213 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.configuration.XMLConfiguration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class AMQQueueFactoryTest extends QpidTestCase { private QueueRegistry _queueRegistry; private VirtualHost _virtualHost; - private Broker _broker; + private AMQQueueFactory _queueFactory; + private List<AMQQueue> _queues; + private QueueConfiguration _queueConfiguration; @Override public void setUp() throws Exception { super.setUp(); - BrokerTestHelper.setUp(); - XMLConfiguration configXml = new XMLConfiguration(); - configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName()); + _queues = new ArrayList<AMQQueue>(); + _virtualHost = mock(VirtualHost.class); - _broker = BrokerTestHelper.createBrokerMock(); - if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings")) - { - when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5); - when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true); - } + VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class); + when(_virtualHost.getConfiguration()).thenReturn(vhostConfig); + _queueConfiguration = mock(QueueConfiguration.class); + when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration); + LogActor logActor = mock(LogActor.class); + CurrentActor.set(logActor); + RootMessageLogger rootLogger = mock(RootMessageLogger.class); + when(logActor.getRootMessageLogger()).thenReturn(rootLogger); + DurableConfigurationStore store = mock(DurableConfigurationStore.class); + when(_virtualHost.getDurableConfigurationStore()).thenReturn(store); + + mockExchangeCreation(); + mockQueueRegistry(); + delegateVhostQueueCreation(); + + when(_virtualHost.getQueues()).thenReturn(_queues); + + + _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry); - _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker)); - _queueRegistry = _virtualHost.getQueueRegistry(); } + private void delegateVhostQueueCreation() throws AMQException + { + final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class); + + when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(), + autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then( + new Answer<AMQQueue>() + { + @Override + public AMQQueue answer(InvocationOnMock invocation) throws Throwable + { + return _queueFactory.createAMQQueueImpl(id.getValue(), + queueName.getValue(), + durable.getValue(), + owner.getValue(), + autoDelete.getValue(), + exclusive.getValue(), + deleteOnNoConsumer.getValue(), + arguments.getValue()); + } + } + ); + } + + private void mockQueueRegistry() + { + _queueRegistry = mock(QueueRegistry.class); + + final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class); + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + AMQQueue queue = capturedQueue.getValue(); + when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue); + when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue); + when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue); + when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue); + _queues.add(queue); + + return null; + } + }).when(_queueRegistry).registerQueue(capturedQueue.capture()); + } + + private void mockExchangeCreation() throws AMQException + { + final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class); + + when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(), + anyBoolean(), anyBoolean(), anyString())).then( + new Answer<Exchange>() + { + @Override + public Exchange answer(InvocationOnMock invocation) throws Throwable + { + final String name = exchangeNameCapture.getValue(); + final UUID id = idCapture.getValue(); + + final Exchange exchange = mock(Exchange.class); + ExchangeType exType = mock(ExchangeType.class); + + when(exchange.getName()).thenReturn(name); + when(exchange.getId()).thenReturn(id); + when(exchange.getType()).thenReturn(exType); + final String typeName = type.getValue(); + when(exType.getType()).thenReturn(typeName); + when(exType.getName()).thenReturn(new AMQShortString(typeName)); + + when(_virtualHost.getExchange(eq(name))).thenReturn(exchange); + when(_virtualHost.getExchange(eq(id))).thenReturn(exchange); + + final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class); + + when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() { + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + when(exchange.isBound(eq(queue.getValue()))).thenReturn(true); + return true; + } + }); + + return exchange; + } + } + ); + } + @Override public void tearDown() throws Exception { - try - { - _virtualHost.close(); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } + super.tearDown(); } private void verifyRegisteredQueueCount(int count) { - assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size()); + assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size()); } private void verifyQueueRegistered(String queueName) { - assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName)); + assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName)); } public void testPriorityQueueRegistration() throws Exception { - FieldTable fieldTable = new FieldTable(); - fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5); + Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false, - false, _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false, + false, + false, + attributes); assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); @@ -115,43 +237,42 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testSimpleQueueRegistration() throws Exception { String queueName = getName(); - AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, - false, _virtualHost, null); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, + false, + false, + null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); verifyQueueRegistered(queueName); //verify that no alternate exchange or DLQ were produced - QueueRegistry qReg = _virtualHost.getQueueRegistry(); assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange()); - assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName)); verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does * cause the alternate exchange to be set and DLQ to be produced. * @throws AMQException */ public void testDeadLetterQueueEnabled() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + attributes); Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); @@ -161,7 +282,7 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - AMQQueue dlQueue = qReg.getQueue(dlQueueName); + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); @@ -178,17 +299,20 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception { + String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); + when(_queueConfiguration.getMaxDeliveryCount()).thenReturn(5); + when(_queueConfiguration.isDeadLetterQueueEnabled()).thenReturn(true); - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, null); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + null); assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); Exchange altExchange = queue.getAlternateExchange(); @@ -199,7 +323,7 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - AMQQueue dlQueue = qReg.getQueue(dlQueueName); + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); @@ -210,81 +334,77 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not * result in the alternate exchange being set and DLQ being created. * @throws AMQException */ public void testDeadLetterQueueDisabled() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false); String queueName = "testDeadLetterQueueDisabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + attributes); assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); - assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName)); //only 1 queue should have been registered verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but * creating an auto-delete queue, does not result in the alternate exchange * being set and DLQ being created. * @throws AMQException */ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); //create an autodelete queue - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, + false, + attributes); assertTrue("Queue should be autodelete", queue.isAutoDelete()); //ensure that the autodelete property overrides the request to enable DLQ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName)); - assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName)); //only 1 queue should have been registered verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has * the desired effect. */ public void testMaximumDeliveryCount() throws Exception { - final FieldTable fieldTable = new FieldTable(); - fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + false, + attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); @@ -293,13 +413,14 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means * that queue is created with a default maximumDeliveryCount of zero (unless set in config). */ public void testMaximumDeliveryCountDefault() throws Exception { - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, - _virtualHost, null); + final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + false, + null); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); @@ -314,7 +435,9 @@ public class AMQQueueFactoryTest extends QpidTestCase { try { - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, _virtualHost, null); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, + false, + null); fail("queue with null name can not be created!"); } catch (Exception e) @@ -336,10 +459,9 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + false, false, false, attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); } catch (Exception e) @@ -362,10 +484,9 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + false, false, false, attributes); fail("queue with DLE name having more than 255 characters can not be created!"); } catch (Exception e) @@ -379,16 +500,17 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testMessageGroupFromConfig() throws Exception { - PropertiesConfiguration queueConfig = new PropertiesConfiguration(); - queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey"); - queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1"); + Map<String,String> arguments = new HashMap<String, String>(); + arguments.put("qpid.group_header_key","mykey"); + arguments.put("qpid.shared_msg_group","1"); + QueueConfiguration qConf = mock(QueueConfiguration.class); + when(qConf.getArguments()).thenReturn(arguments); + when(qConf.getName()).thenReturn("test"); - final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);; - QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost); - assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY)); - assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(qConf); + assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY)); + assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)); } private String generateStringWithLength(char ch, int length) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index b677ece408..e490db288c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -469,7 +469,14 @@ public class MockAMQQueue implements AMQQueue } - public Map<String, Object> getArguments() + @Override + public Collection<String> getAvailableAttributes() + { + return null; + } + + @Override + public Object getAttribute(String attrName) { return null; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0faa796f1c..2328745b83 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; +import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessageHeader; @@ -65,7 +65,7 @@ public class SimpleAMQQueueTest extends QpidTestCase private AMQShortString _routingKey = new AMQShortString("routing key"); private DirectExchange _exchange; private MockSubscription _subscription = new MockSubscription(); - private FieldTable _arguments = null; + private Map<String,Object> _arguments = null; private MessagePublishInfo info = new MessagePublishInfo() { @@ -103,8 +103,8 @@ public class SimpleAMQQueueTest extends QpidTestCase _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), - false, false, _virtualHost, FieldTable.convertToMap(_arguments)); + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), + false, false, false, _arguments); _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString()); } @@ -127,8 +127,11 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testCreateQueue() throws AMQException { _queue.stop(); - try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments)); + try + { + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null, + false, _owner.asString(), false, + false, false, _arguments); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -137,7 +140,8 @@ public class SimpleAMQQueueTest extends QpidTestCase e.getMessage().contains("name")); } - try { + try + { _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } @@ -147,8 +151,10 @@ public class SimpleAMQQueueTest extends QpidTestCase e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false, - false, _virtualHost, FieldTable.convertToMap(_arguments)); + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), + "differentName", false, + _owner.asString(), false, + false, false, _arguments); assertNotNull("Queue was not created", _queue); } @@ -1225,12 +1231,12 @@ public class SimpleAMQQueueTest extends QpidTestCase return _subscription; } - public FieldTable getArguments() + public Map<String,Object> getArguments() { return _arguments; } - public void setArguments(FieldTable arguments) + public void setArguments(Map<String,Object> arguments) { _arguments = arguments; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 4abb7233dc..c115af5a38 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -50,8 +50,8 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase try { - SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "test", false, - "owner", false, false, test, null); + SimpleAMQQueue queue = (SimpleAMQQueue) + test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null); assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 67cf0780da..e9ad4ba236 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -52,6 +52,9 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase { @@ -178,7 +181,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testBindQueue() throws Exception { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -197,7 +200,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUnbindQueue() throws Exception { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -212,8 +215,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueue() throws Exception { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, null); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); Map<String, Object> queueAttributes = new HashMap<String, Object>(); @@ -225,13 +228,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueueFieldTable() throws Exception { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); @@ -241,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -250,8 +252,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { Exchange alternateExchange = createTestAlternateExchange(); - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); @@ -275,16 +277,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUpdateQueueExclusivity() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // update the queue to have exclusive=false - queue = createTestQueue(getName(), getName() + "Owner", false); - when(queue.getArguments()).thenReturn(attributes); + queue = createTestQueue(getName(), getName() + "Owner", false, attributes); DurableConfigurationStoreHelper.updateQueue(_configStore, queue); @@ -295,7 +296,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -304,17 +305,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUpdateQueueAlternateExchange() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); - queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - when(queue.getArguments()).thenReturn(attributes); + queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes); DurableConfigurationStoreHelper.updateQueue(_configStore, queue); @@ -325,7 +324,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -334,12 +333,11 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testRemoveQueue() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // remove queue DurableConfigurationStoreHelper.removeQueue(_configStore,queue); @@ -349,12 +347,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest anyMap()); } - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException + private AMQQueue createTestQueue(String queueName, + String queueOwner, + boolean exclusive, + final Map<String, Object> arguments) throws AMQStoreException { - return createTestQueue(queueName, queueOwner, exclusive, null); + return createTestQueue(queueName, queueOwner, exclusive, null, arguments); } - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException + private AMQQueue createTestQueue(String queueName, + String queueOwner, + boolean exclusive, + Exchange alternateExchange, + final Map<String, Object> arguments) throws AMQStoreException { AMQQueue queue = mock(AMQQueue.class); when(queue.getName()).thenReturn(queueName); @@ -363,6 +368,23 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(queue.isExclusive()).thenReturn(exclusive); when(queue.getId()).thenReturn(_queueId); when(queue.getAlternateExchange()).thenReturn(alternateExchange); + if(arguments != null && !arguments.isEmpty()) + { + when(queue.getAvailableAttributes()).thenReturn(arguments.keySet()); + final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class); + when(queue.getAttribute(requestedAttribute.capture())).then( + new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + String attrName = requestedAttribute.getValue(); + return arguments.get(attrName); + } + }); + } + return queue; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index caf6acb4bb..cb1fc2737d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -181,9 +181,8 @@ public class BrokerTestHelper public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException { - SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null, - false, false, virtualHost, Collections.<String, Object>emptyMap()); - virtualHost.getQueueRegistry().registerQueue(queue); + SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null, + false, false, false, Collections.<String, Object>emptyMap()); return queue; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 2d3483f078..66a71c562f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -45,6 +45,8 @@ import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.*; import org.apache.qpid.server.security.SecurityManager; @@ -82,7 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private DurableConfigurationStore _store; private ExchangeFactory _exchangeFactory; private ExchangeRegistry _exchangeRegistry; - private QueueRegistry _queueRegistry; + private QueueFactory _queueFactory; @Override public void setUp() throws Exception @@ -105,6 +107,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); + when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue); final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class); doAnswer(new Answer() @@ -114,37 +117,68 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public Object answer(final InvocationOnMock invocation) throws Throwable { Exchange exchange = registeredExchange.getValue(); - when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange); - when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange); + when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange); + when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange); return null; } }).when(_exchangeRegistry).registerExchange(registeredExchange.capture()); - _queueRegistry = mock(QueueRegistry.class); - when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry); + final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class); - when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + _queueFactory = mock(QueueFactory.class); - final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class); - doAnswer(new Answer() - { + when(_queueFactory.createAMQQueueImpl(idArg.capture(), queueArg.capture(), + anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then( + new Answer() + { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - AMQQueue queue = registeredQueue.getValue(); - when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue); - when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue); - return null; - } - }).when(_queueRegistry).registerQueue(registeredQueue.capture()); + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + final AMQQueue queue = mock(AMQQueue.class); + + final String queueName = queueArg.getValue(); + final UUID queueId = idArg.getValue(); + + when(queue.getName()).thenReturn(queueName); + when(queue.getId()).thenReturn(queueId); + when(_vhost.getQueue(eq(queueName))).thenReturn(queue); + when(_vhost.getQueue(eq(queueId))).thenReturn(queue); + + final ArgumentCaptor<Exchange> altExchangeArg = ArgumentCaptor.forClass(Exchange.class); + doAnswer( + new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + final Exchange value = altExchangeArg.getValue(); + when(queue.getAlternateExchange()).thenReturn(value); + return null; + } + } + ).when(queue).setAlternateExchange(altExchangeArg.capture()); + + Map args = argsArg.getValue(); + if(args.containsKey(Queue.ALTERNATE_EXCHANGE)) + { + final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); + final Exchange exchange = _exchangeRegistry.getExchange(exchangeId); + queue.setAlternateExchange(exchange); + } + return queue; + } + }); _exchangeFactory = mock(ExchangeFactory.class); + DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(_vhost, _exchangeRegistry), + new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory), new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory), new BindingRecoverer(_vhost, _exchangeRegistry) }; @@ -356,24 +390,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final UUID queueId = new UUID(1, 0); final UUID exchangeId = new UUID(2, 0); - /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than - queue creation being on vhost - yuck! */ - SecurityManager securityManager = mock(SecurityManager.class); - when(_vhost.getSecurityManager()).thenReturn(securityManager); - when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(), - any(AMQShortString.class),anyString())).thenReturn(true); - VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class); - when(_vhost.getConfiguration()).thenReturn(configuration); - QueueConfiguration queueConfiguration = mock(QueueConfiguration.class); - when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration); - LogActor logActor = mock(LogActor.class); - CurrentActor.set(logActor); - RootMessageLogger rootLogger = mock(RootMessageLogger.class); - when(logActor.getRootMessageLogger()).thenReturn(rootLogger); - /* end of queue creation mock hackery */ - final Exchange customExchange = mock(Exchange.class); + when(customExchange.getId()).thenReturn(exchangeId); + when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME); + when(_exchangeFactory.createExchange(eq(exchangeId), eq(CUSTOM_EXCHANGE_NAME), eq(HeadersExchange.TYPE.getType()), @@ -390,7 +411,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.completeConfigurationRecovery(); - assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange); + assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange()); } private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 6769c1c2fc..1ca7ff1b65 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ScheduledFuture; import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -28,6 +29,7 @@ import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; @@ -119,6 +121,43 @@ public class MockVirtualHost implements VirtualHost } @Override + public AMQQueue getQueue(String name) + { + return null; + } + + @Override + public AMQQueue getQueue(UUID id) + { + return null; + } + + @Override + public Collection<AMQQueue> getQueues() + { + return null; + } + + @Override + public int removeQueue(AMQQueue queue) throws AMQException + { + return 0; + } + + @Override + public AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQException + { + return null; + } + + @Override public Exchange createExchange(UUID id, String exchange, String type, @@ -141,6 +180,12 @@ public class MockVirtualHost implements VirtualHost } @Override + public Exchange getExchange(UUID id) + { + return null; + } + + @Override public Exchange getDefaultExchange() { return null; diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index e72196c383..03cb483e40 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -244,7 +244,7 @@ public class StandardVirtualHostTest extends QpidTestCase VirtualHost vhost = createVirtualHost(vhostName, config); assertNotNull("virtualhost should exist", vhost); - AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); + AMQQueue queue = vhost.getQueue(queueName); assertNotNull("queue should exist", queue); Exchange defaultExch = vhost.getDefaultExchange(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java index 602bdb66b5..c9f3aca91f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java @@ -97,7 +97,7 @@ public class DurableQueueLoggingTest extends AbstractTestLogging String clientID = _connection.getClientID(); assertNotNull("clientID should not be null", clientID); - + validateQueueProperties(results, false, false, clientID); } @@ -256,7 +256,7 @@ public class DurableQueueLoggingTest extends AbstractTestLogging validateQueueProperties(results, true, true, null); } - + private List<String> waitForMesssage() throws IOException { // Validation @@ -267,14 +267,14 @@ public class DurableQueueLoggingTest extends AbstractTestLogging // Only 1 Queue message should hav been logged assertEquals("Result set size not as expected", 1, results.size()); - + return results; } public void validateQueueProperties(List<String> results, boolean hasPriority, boolean hasAutodelete, String clientID) { String log = getLogMessage(results, 0); - + // Message Should be a QUE-1001 validateMessageID("QUE-1001", log); @@ -290,7 +290,7 @@ public class DurableQueueLoggingTest extends AbstractTestLogging fromMessage(log).contains("Priority: " + PRIORITIES)); // Queue is AutoDelete - assertEquals("Unexpected AutoDelete status:" + fromMessage(log), hasAutodelete, + assertEquals("Unexpected AutoDelete status:" + fromMessage(log), hasAutodelete, fromMessage(log).contains("AutoDelete")); if(clientID != null) diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java index 14dee60124..844e3ecc11 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java @@ -336,7 +336,7 @@ public class SortedQueueTest extends QpidBrokerTestCase private Queue createQueue() throws AMQException, JMSException { final Map<String, Object> arguments = new HashMap<String, Object>(); - arguments.put(AMQQueueFactory.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY); + arguments.put(QueueArgumentsConverter.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY); ((AMQSession<?,?>) _producerSession).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments); final Queue queue = new AMQQueue("amq.direct", getTestQueueName()); ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination) queue); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index a3551b8952..a57eca23bd 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; @@ -38,6 +39,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.model.Broker; @@ -49,7 +51,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.ConflationQueue; import org.apache.qpid.server.protocol.v0_8.IncomingMessage; -import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -83,17 +85,17 @@ public class MessageStoreTest extends QpidTestCase private String directExchangeName = "MST-DirectExchange"; private String topicExchangeName = "MST-TopicExchange"; - private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable"); - private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable"); - private AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue"); - private AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue"); + private String durablePriorityTopicQueueName = "MST-PriorityTopicQueue-Durable"; + private String durableTopicQueueName = "MST-TopicQueue-Durable"; + private String priorityTopicQueueName = "MST-PriorityTopicQueue"; + private String topicQueueName = "MST-TopicQueue"; - private AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive"); - private AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); - private AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable"); - private AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); - private AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); - private AMQShortString queueName = new AMQShortString("MST-Queue"); + private String durableExclusiveQueueName = "MST-Queue-Durable-Exclusive"; + private String durablePriorityQueueName = "MST-PriorityQueue-Durable"; + private String durableLastValueQueueName = "MST-LastValueQueue-Durable"; + private String durableQueueName = "MST-Queue-Durable"; + private String priorityQueueName = "MST-PriorityQueue"; + private String queueName = "MST-Queue"; private AMQShortString directRouting = new AMQShortString("MST-direct"); private AMQShortString topicRouting = new AMQShortString("MST-topic"); @@ -202,7 +204,7 @@ public class MessageStoreTest extends QpidTestCase */ public void testQueueExchangeAndBindingCreation() throws Exception { - assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueueRegistry().getQueues().size()); + assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueues().size()); createAllQueues(); createAllTopicQueues(); @@ -240,7 +242,7 @@ public class MessageStoreTest extends QpidTestCase validateMessageOnTopics(2, true); assertEquals("Not all queues correctly registered", - 10, getVirtualHost().getQueueRegistry().getQueues().size()); + 10, getVirtualHost().getQueues().size()); } /** @@ -269,13 +271,11 @@ public class MessageStoreTest extends QpidTestCase { testMessagePersistence(); - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after recovery", - 6, queueRegistry.getQueues().size()); + 6, getVirtualHost().getQueues().size()); //clear the queue - queueRegistry.getQueue(durableQueueName).clearQueue(); + _virtualHost.getQueue(durableQueueName).clearQueue(); //check the messages are gone validateMessageOnQueue(durableQueueName, 0); @@ -294,7 +294,7 @@ public class MessageStoreTest extends QpidTestCase public void testQueuePersistence() throws Exception { assertEquals("Should not be any existing queues", - 0, getVirtualHost().getQueueRegistry().getQueues().size()); + 0, getVirtualHost().getQueues().size()); //create durable and non durable queues/topics createAllQueues(); @@ -303,20 +303,18 @@ public class MessageStoreTest extends QpidTestCase //reload the virtual host, prompting recovery of the queues/topics reloadVirtualHost(); - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after recovery", - 6, queueRegistry.getQueues().size()); + 6, getVirtualHost().getQueues().size()); //Validate the non-Durable Queues were not recovered. assertNull("Non-Durable queue still registered:" + priorityQueueName, - queueRegistry.getQueue(priorityQueueName)); + getVirtualHost().getQueue(priorityQueueName)); assertNull("Non-Durable queue still registered:" + queueName, - queueRegistry.getQueue(queueName)); + getVirtualHost().getQueue(queueName)); assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, - queueRegistry.getQueue(priorityTopicQueueName)); + getVirtualHost().getQueue(priorityTopicQueueName)); assertNull("Non-Durable queue still registered:" + topicQueueName, - queueRegistry.getQueue(topicQueueName)); + getVirtualHost().getQueue(topicQueueName)); //Validate normally expected properties of Queues/Topics validateDurableQueueProperties(); @@ -336,27 +334,24 @@ public class MessageStoreTest extends QpidTestCase //Register Durable Queue createQueue(durableQueueName, false, true, false, false); - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of queues registered before recovery", - 1, queueRegistry.getQueues().size()); + 1, getVirtualHost().getQueues().size()); reloadVirtualHost(); - queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of queues registered after first recovery", - 1, queueRegistry.getQueues().size()); + 1, getVirtualHost().getQueues().size()); //test that removing the queue means it is not recovered next time - final AMQQueue queue = queueRegistry.getQueue(durableQueueName); + final AMQQueue queue = getVirtualHost().getQueue(durableQueueName); DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); reloadVirtualHost(); - queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of queues registered after second recovery", - 0, queueRegistry.getQueues().size()); + 0, getVirtualHost().getQueues().size()); assertNull("Durable queue was not removed:" + durableQueueName, - queueRegistry.getQueue(durableQueueName)); + getVirtualHost().getQueue(durableQueueName)); } /** @@ -450,34 +445,30 @@ public class MessageStoreTest extends QpidTestCase */ public void testDurableBindingRemoval() throws Exception { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - //create durable queue and exchange, bind them Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true); createQueue(durableQueueName, false, true, false, false); - bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); + bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false, null); assertEquals("Incorrect number of bindings registered before recovery", - 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); + 1, getVirtualHost().getQueue(durableQueueName).getBindings().size()); //verify binding is actually normally recovered reloadVirtualHost(); - queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of bindings registered after first recovery", - 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); + 1, getVirtualHost().getQueue(durableQueueName).getBindings().size()); exch = getVirtualHost().getExchange(directExchangeName); assertNotNull("Exchange was not recovered", exch); //remove the binding and verify result after recovery - unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); + unbindQueueFromExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false, null); reloadVirtualHost(); - queueRegistry = getVirtualHost().getQueueRegistry(); assertEquals("Incorrect number of bindings registered after second recovery", - 0, queueRegistry.getQueue(durableQueueName).getBindings().size()); + 0, getVirtualHost().getQueue(durableQueueName).getBindings().size()); } /** @@ -514,15 +505,14 @@ public class MessageStoreTest extends QpidTestCase /** Validates the Durable queues and their properties are as expected following recovery */ private void validateBindingProperties() { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size()); + assertEquals("Incorrect number of (durable) queues following recovery", 6, getVirtualHost().getQueues().size()); - validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false); - validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true); - validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false); - validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true); - validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false); + validateBindingProperties(getVirtualHost().getQueue(durablePriorityQueueName).getBindings(), false); + validateBindingProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName).getBindings(), true); + validateBindingProperties(getVirtualHost().getQueue(durableQueueName).getBindings(), false); + validateBindingProperties(getVirtualHost().getQueue(durableTopicQueueName).getBindings(), true); + validateBindingProperties(getVirtualHost().getQueue(durableExclusiveQueueName).getBindings(), false); } /** @@ -550,18 +540,14 @@ public class MessageStoreTest extends QpidTestCase private void setQueueExclusivity(boolean exclusive) throws AMQException { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); + AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); queue.setExclusive(exclusive); } private void validateQueueExclusivityProperty(boolean expected) { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); + AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); } @@ -569,14 +555,12 @@ public class MessageStoreTest extends QpidTestCase private void validateDurableQueueProperties() { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false); - validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true); + validateQueueProperties(getVirtualHost().getQueue(durablePriorityQueueName), true, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName), true, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durableQueueName), false, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durableTopicQueueName), false, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durableExclusiveQueueName), false, true, true, false); + validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true); } private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) @@ -724,10 +708,10 @@ public class MessageStoreTest extends QpidTestCase createQueue(topicQueueName, false, false, false, false); } - private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) + private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) { - FieldTable queueArguments = null; + Map<String,Object> queueArguments = null; if(usePriority || lastValueQueue) { @@ -736,14 +720,12 @@ public class MessageStoreTest extends QpidTestCase if (usePriority) { - queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); + queueArguments = Collections.singletonMap(Queue.PRIORITIES, (Object) DEFAULT_PRIORTY_LEVEL); } if (lastValueQueue) { - queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY); + queueArguments = Collections.singletonMap(Queue.LVQ_KEY, (Object) LVQ_KEY); } AMQQueue queue = null; @@ -751,25 +733,17 @@ public class MessageStoreTest extends QpidTestCase //Ideally we would be able to use the QueueDeclareHandler here. try { - queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName.asString(), durable, queueOwner.asString(), false, exclusive, - getVirtualHost(), FieldTable.convertToMap(queueArguments)); + queue = getVirtualHost().createQueue(UUIDGenerator.generateRandomUUID(), queueName, durable, queueOwner.asString(), false, exclusive, + false, queueArguments); validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(), - queue, - queueArguments); - } } catch (AMQException e) { fail(e.getMessage()); } - getVirtualHost().getQueueRegistry().registerQueue(queue); - } private Map<String, Exchange> createExchanges() @@ -805,28 +779,24 @@ public class MessageStoreTest extends QpidTestCase private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey) { FieldTable queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); + queueArguments.put(new AMQShortString(QueueArgumentsConverter.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false, queueArguments); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false, null); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityQueueName), false, queueArguments); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(queueName), false, null); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false, null); } private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey) { FieldTable queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); - - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + queueArguments.put(new AMQShortString(QueueArgumentsConverter.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true, queueArguments); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableTopicQueueName), true, null); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityTopicQueueName), true, queueArguments); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(topicQueueName), true, null); } @@ -894,9 +864,9 @@ public class MessageStoreTest extends QpidTestCase } } - private void validateMessageOnQueue(AMQShortString queueName, long messageCount) + private void validateMessageOnQueue(String queueName, long messageCount) { - AMQQueue queue = getVirtualHost().getQueueRegistry().getQueue(queueName); + AMQQueue queue = getVirtualHost().getQueue(queueName); assertNotNull("Queue(" + queueName + ") not correctly registered:", queue); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index cf066e3c01..2d6943f643 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.NotificationCheckTest; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.queue.SimpleAMQQueueTest; import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; @@ -168,7 +169,7 @@ public class QueueManagementTest extends QpidBrokerTestCase public void testNewQueueWithDescription() throws Exception { String queueName = getTestQueueName(); - Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); + Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); @@ -181,7 +182,7 @@ public class QueueManagementTest extends QpidBrokerTestCase public void testQueueDescriptionSurvivesRestart() throws Exception { String queueName = getTestQueueName(); - Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); + Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); @@ -195,7 +196,7 @@ public class QueueManagementTest extends QpidBrokerTestCase } /** - * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests + * Tests queue creation with {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}. */ public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception @@ -204,7 +205,7 @@ public class QueueManagementTest extends QpidBrokerTestCase final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); final Integer deliveryCount = 1; - final Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); + final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); managedBroker.createNewQueue(queueName, null, true, arguments); // Ensure the queue exists @@ -225,10 +226,10 @@ public class QueueManagementTest extends QpidBrokerTestCase final Long maximumQueueDepth = 300l; final Long maximumMessageAge = 400l; final Map<String, Object> arguments = new HashMap<String, Object>(); - arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount); - arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize); - arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth); - arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge); managedBroker.createNewQueue(queueName, null, true, arguments); @@ -642,7 +643,7 @@ public class QueueManagementTest extends QpidBrokerTestCase final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); final Object messageGroupKey = "test"; - final Map<String, Object> arguments = Collections.singletonMap(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey); + final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey); managedBroker.createNewQueue(queueName, null, true, arguments); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); @@ -659,8 +660,8 @@ public class QueueManagementTest extends QpidBrokerTestCase final Object messageGroupKey = "test"; final Map<String, Object> arguments = new HashMap<String, Object>(2); - arguments.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey); - arguments.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey); + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); managedBroker.createNewQueue(queueName, null, true, arguments); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 940d6a3298..969f222316 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; @@ -291,7 +292,7 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName , "lvq", lvqQueue); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE)); - assertEquals("Unexpected lvq key attribute", AMQQueueFactory.QPID_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY)); + assertEquals("Unexpected lvq key attribute", AMQQueueFactory.QPID_DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY)); } public void testPutCreateSortedQueueWithoutKey() throws Exception @@ -460,7 +461,7 @@ public class VirtualHostRestTest extends QpidRestTestCase String queueName = getTestQueueName(); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); //verify the starting state Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 67a2988ad1..bd826259bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -200,8 +200,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "{" + "exclusive: true," + "arguments: {" + - "'qpid.max_size': 1000," + - "'qpid.max_count': 100" + + "'qpid.alert_size': 1000," + + "'qpid.alert_count': 100" + "}" + "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + @@ -401,7 +401,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "x-declare: " + "{ " + "auto-delete: true," + - "arguments: {'qpid.max_count': 100}" + + "arguments: {'qpid.alert_count': 100}" + "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + "{exchange : 'amq.topic', key : 'a.#'}," + @@ -485,7 +485,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Hashtable<String,String> map = new Hashtable<String,String>(); map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + - "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}"); + "{x-declare: {auto-delete: true, arguments : {'qpid.alert_size': 1000}}}}"); map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }"); @@ -603,7 +603,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase String addr = "ADDR:amq.direct/x512; {" + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + - "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; + "arguments : {'qpid.alert_size': 1000, 'qpid.policy_type': ring} } } }"; queue = ssn.createQueue(addr); cons = ssn.createConsumer(queue); @@ -1403,7 +1403,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String xDeclareArgs = "x-declare: { exclusive: false, auto-delete: false," + "alternate-exchange: 'amq.fanout'," + - "arguments: {'qpid.max_size': 1000,'qpid.max_count': 100}" + + "arguments: {'qpid.alert_size': 1000,'qpid.alert_count': 100}" + "}"; String addr = "ADDR:amq.topic/test; {link: {name:my-queue, durable:true," + xDeclareArgs + "}}"; |
