diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 103 |
1 files changed, 20 insertions, 83 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d42bd6cf03..73c2870b9b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -41,11 +41,8 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.QueueConfigType; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.AbstractConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -55,7 +52,6 @@ import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager; import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager; @@ -135,23 +131,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicInteger _bindingCountHigh = new AtomicInteger(); /** max allowed size(KB) of a single message */ - private long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); + private long _maximumMessageSize; /** max allowed number of messages on a queue. */ - private long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); + private long _maximumMessageCount; /** max queue depth for the queue */ - private long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); + private long _maximumQueueDepth; /** maximum message age before alerts occur */ - private long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); + private long _maximumMessageAge; /** the minimum interval between sending out consecutive alerts of the same type */ - private long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); + private long _minimumAlertRepeatGap; - private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + private long _capacity; - private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + private long _flowResumeCapacity; private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); @@ -185,11 +181,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes //TODO : persist creation time private long _createTime = System.currentTimeMillis(); - private UUID _qmfId; - private ConfigurationPlugin _queueConfiguration; + private AbstractConfiguration _queueConfiguration; /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ - private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); + private int _maximumDeliveryCount; private final MessageGroupManager _messageGroupManager; private final Collection<SubscriptionRegistrationListener> _subscriptionListeners = @@ -243,7 +238,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments); _id = id; - _qmfId = getConfigStore().createId(); _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); _logSubject = new QueueLogSubject(this); @@ -259,8 +253,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes durable, !durable, _entries.getPriorities() > 0)); - getConfigStore().addConfiguredObject(this); - if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) { if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1")) @@ -331,22 +323,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _id; } - @Override - public UUID getQMFId() - { - return _qmfId; - } - - public QueueConfigType getConfigType() - { - return QueueConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getVirtualHost(); - } - public boolean isDurable() { return _durable; @@ -621,24 +597,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes break; } } - - reconfigure(); - } - - private void reconfigure() - { - //Reconfigure the queue for to reflect this new binding. - ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this); - - if (config != null) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); - } - // Reconfigure with new config. - configure(config); - } } public int getBindingCountHigh() @@ -649,8 +607,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void removeBinding(final Binding binding) { _bindings.remove(binding); - - reconfigure(); } public List<Binding> getBindings() @@ -1383,7 +1339,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } _virtualHost.getQueueRegistry().unregisterQueue(_name); - getConfigStore().removeConfiguredObject(this); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -1442,7 +1397,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { } - }, 0L); + }); txn.dequeue(this, entry.getMessage(), new ServerTransaction.Action() { @@ -2161,39 +2116,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void configure(ConfigurationPlugin config) + public void configure(QueueConfiguration config) { if (config != null) { - if (config instanceof QueueConfiguration) - { - - setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge()); - setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth()); - setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); - setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); - setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); - setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount()); - _capacity = ((QueueConfiguration)config).getCapacity(); - _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); - } - - _queueConfiguration = config; - + setMaximumMessageAge(config.getMaximumMessageAge()); + setMaximumQueueDepth(config.getMaximumQueueDepth()); + setMaximumMessageSize(config.getMaximumMessageSize()); + setMaximumMessageCount(config.getMaximumMessageCount()); + setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + setMaximumDeliveryCount(config.getMaxDeliveryCount()); + _capacity = config.getCapacity(); + _flowResumeCapacity = config.getFlowResumeCapacity(); } } - - public ConfigurationPlugin getConfiguration() - { - return _queueConfiguration; - } - - public ConfigStore getConfigStore() - { - return getVirtualHost().getConfigStore(); - } - public long getMessageDequeueCount() { return _dequeueCount.get(); |