diff options
Diffstat (limited to 'java/broker/src')
5 files changed, 360 insertions, 267 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 53c36d9718..e1c1de29bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -37,6 +38,8 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -154,10 +157,7 @@ public class AMQQueue implements Managable, Comparable /** total messages received by the queue since startup. */ public AtomicLong _totalMessagesReceived = new AtomicLong(); - public int compareTo(Object o) - { - return _name.compareTo(((AMQQueue) o).getName()); - } + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -950,4 +950,20 @@ public class AMQQueue implements Managable, Comparable return new QueueEntry(this, amqMessage); } + public int compareTo(Object o) + { + return _name.compareTo(((AMQQueue) o).getName()); + } + + + public void removeExpiredIfNoSubscribers() throws AMQException + { + synchronized(_subscribers.getChangeLock()) + { + if(_subscribers.isEmpty()) + { + _deliveryMgr.removeExpired(); + } + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 3cf2cb9b12..a7cef1b983 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -212,6 +212,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + /** + * NOTE : This method should only be called when there are no active subscribers + */ + public void removeExpired() throws AMQException + { + _lock.lock(); + + + for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) + { + QueueEntry entry = iter.next(); + if(entry.expired()) + { + // fixme: Currently we have to update the total byte size here for the data in the queue + _totalMessageSize.addAndGet(-entry.getSize()); + _queue.dequeue(_reapingStoreContext,entry); + iter.remove(); + } + } + + + _lock.unlock(); + } + /** @return the state of the async processor. */ public boolean isProcessingAsync() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index f7f35a9319..1568f58e2e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -97,4 +97,6 @@ interface DeliveryManager long getOldestMessageArrival(); void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg); + + void removeExpired() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b73b8d7e07..b2f8cae8ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -37,7 +37,8 @@ class SubscriptionSet implements WeightedSubscriptionManager /** Used to control the round robin delivery of content */ private int _currentSubscriber; - private final Object _subscriptionsChange = new Object(); + + private final Object _changeLock = new Object(); /** Accessor for unit tests. */ @@ -48,7 +49,7 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - synchronized (_subscriptionsChange) + synchronized (_changeLock) { _subscriptions.add(subscription); } @@ -66,7 +67,7 @@ class SubscriptionSet implements WeightedSubscriptionManager // TODO: possibly need O(1) operation here. Subscription sub = null; - synchronized (_subscriptionsChange) + synchronized (_changeLock) { int subIndex = _subscriptions.indexOf(subscription); @@ -226,4 +227,11 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _subscriptions.size(); } + + + public Object getChangeLock() + { + return _changeLock; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 8d6a26fdbc..9addf83e01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -1,260 +1,303 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
- private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
- private final String _name;
-
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- protected VirtualHostMBean _virtualHostMBean;
-
- private AMQBrokerManagerMBean _brokerMBean;
-
- private AuthenticationManager _authenticationManager;
-
- private AccessManager _accessManager;
-
-
- public void setAccessableName(String name)
- {
- _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
- + name + ") ignored remains :" + getAccessableName());
- }
-
- public String getAccessableName()
- {
- return _name;
- }
-
-
- /**
- * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, "VirtualHost");
- }
-
- public String getObjectInstanceName()
- {
- return _name.toString();
- }
-
- public String getName()
- {
- return _name.toString();
- }
-
- public VirtualHost getVirtualHost()
- {
- return VirtualHost.this;
- }
-
-
- } // End of MBean class
-
- /**
- * Used for testing only
- * @param name
- * @param store
- * @throws Exception
- */
- public VirtualHost(String name, MessageStore store) throws Exception
- {
- this(name, null, store);
- }
-
- /**
- * Normal Constructor
- * @param name
- * @param hostConfig
- * @throws Exception
- */
- public VirtualHost(String name, Configuration hostConfig) throws Exception
- {
- this(name, hostConfig, null);
- }
-
- private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
- {
- _name = name;
-
- _virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
-
- _queueRegistry = new DefaultQueueRegistry(this);
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
-
- if (store != null)
- {
- _messageStore = store;
- }
- else
- {
- if (hostConfig == null)
- {
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
- }
- initialiseMessageStore(hostConfig);
- }
-
- _exchangeRegistry.initialise();
-
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
- _accessManager = new AccessManagerImpl(name, hostConfig);
-
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- }
-
- private void initialiseMessageStore(Configuration config) throws Exception
- {
- String messageStoreClass = config.getString("store.class");
-
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", config);
- }
-
-
- public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
- {
- T instance;
- try
- {
- instance = instanceType.newInstance();
- }
- catch (Exception e)
- {
- _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
- }
- Configurator.configure(instance);
-
- return instance;
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- throw new UnsupportedOperationException();
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public AccessManager getAccessManager()
- {
- return _accessManager;
- }
-
- public void close() throws Exception
- {
- if (_messageStore != null)
- {
- _messageStore.close();
- }
- }
-
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import javax.management.NotCompliantMBeanException; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.security.access.AccessManager; +import org.apache.qpid.server.security.access.AccessManagerImpl; +import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.AMQException; + +import java.util.Timer; +import java.util.TimerTask; + +public class VirtualHost implements Accessable +{ + private static final Logger _logger = Logger.getLogger(VirtualHost.class); + + + private final String _name; + + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + protected VirtualHostMBean _virtualHostMBean; + + private AMQBrokerManagerMBean _brokerMBean; + + private AuthenticationManager _authenticationManager; + + private AccessManager _accessManager; + + private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true); + + private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; + + public void setAccessableName(String name) + { + _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" + + name + ") ignored remains :" + getAccessableName()); + } + + public String getAccessableName() + { + return _name; + } + + + /** + * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any + * implementaion of an Exchange MBean should extend this class. + */ + public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + { + public VirtualHostMBean() throws NotCompliantMBeanException + { + super(ManagedVirtualHost.class, "VirtualHost"); + } + + public String getObjectInstanceName() + { + return _name.toString(); + } + + public String getName() + { + return _name.toString(); + } + + public VirtualHost getVirtualHost() + { + return VirtualHost.this; + } + + + } // End of MBean class + + /** + * Used for testing only + * @param name + * @param store + * @throws Exception + */ + public VirtualHost(String name, MessageStore store) throws Exception + { + this(name, null, store); + } + + /** + * Normal Constructor + * @param name + * @param hostConfig + * @throws Exception + */ + public VirtualHost(String name, Configuration hostConfig) throws Exception + { + this(name, hostConfig, null); + } + + public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception + { + _name = name; + + _virtualHostMBean = new VirtualHostMBean(); + // This isn't needed to be registered + //_virtualHostMBean.register(); + + _queueRegistry = new DefaultQueueRegistry(this); + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeFactory.initialise(hostConfig); + _exchangeRegistry = new DefaultExchangeRegistry(this); + + if (store != null) + { + _messageStore = store; + } + else + { + if (hostConfig == null) + { + throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + } + initialiseMessageStore(hostConfig); + } + + _exchangeRegistry.initialise(); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); + + _accessManager = new AccessManagerImpl(name, hostConfig); + + _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); + _brokerMBean.register(); + initialiseHouseKeeping(hostConfig); + } + + private void initialiseHouseKeeping(final Configuration hostConfig) + { + + long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD); + + /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ + if(period != 0L) + { + class RemoveExpiredMessagesTask extends TimerTask + { + public void run() + { + for(AMQQueue q : _queueRegistry.getQueues()) + { + + try + { + q.removeExpiredIfNoSubscribers(); + } + catch (AMQException e) + { + _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e); + throw new RuntimeException(e); + } + } + } + } + + _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), + period/2, + period); + } + } + + private void initialiseMessageStore(Configuration config) throws Exception + { + String messageStoreClass = config.getString("store.class"); + + Class clazz = Class.forName(messageStoreClass); + Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); + } + _messageStore = (MessageStore) o; + _messageStore.configure(this, "store", config); + } + + + public <T> T getConfiguredObject(Class<T> instanceType, Configuration config) + { + T instance; + try + { + instance = instanceType.newInstance(); + } + catch (Exception e) + { + _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); + throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); + } + Configurator.configure(instance); + + return instance; + } + + + public String getName() + { + return _name; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public ApplicationRegistry getApplicationRegistry() + { + throw new UnsupportedOperationException(); + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public AccessManager getAccessManager() + { + return _accessManager; + } + + public void close() throws Exception + { + if (_messageStore != null) + { + _messageStore.close(); + } + } + + public ManagedObject getBrokerMBean() + { + return _brokerMBean; + } + + public ManagedObject getManagedObject() + { + return _virtualHostMBean; + } +} |