summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java76
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java44
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java100
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java360
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java876
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java109
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java106
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java158
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java141
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java81
15 files changed, 2221 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
new file mode 100644
index 0000000000..2db1944cd1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+
+public abstract class HouseKeepingTask implements Runnable
+{
+ Logger _logger = Logger.getLogger(this.getClass());
+
+ private VirtualHost _virtualHost;
+
+ private String _name;
+
+ private RootMessageLogger _rootLogger;
+ public HouseKeepingTask(VirtualHost vhost)
+ {
+ _virtualHost = vhost;
+ _name = _virtualHost.getName() + ":" + this.getClass().getSimpleName();
+ _rootLogger = CurrentActor.get().getRootMessageLogger();
+ }
+
+ final public void run()
+ {
+ // Don't need to undo this as this is a thread pool thread so will
+ // always go through here before we do any real work.
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(new AbstractActor(_rootLogger)
+ {
+ @Override
+ public String getLogMessage()
+ {
+ return _name;
+ }
+ });
+
+ try
+ {
+ execute();
+ }
+ catch (Throwable e)
+ {
+ _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
+ }
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ /** Execute the plugin. */
+ public abstract void execute();
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
new file mode 100644
index 0000000000..767474d5ae
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.io.IOException;
+
+import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+
+/**
+ * The management interface exposed to allow management of a virtualHost
+ */
+public interface ManagedVirtualHost
+{
+ static final String TYPE = "VirtualHost";
+ static final int VERSION = 1;
+
+ /**
+ * Returns the name of the managed virtualHost.
+ * @return the name of the exchange.
+ * @throws java.io.IOException
+ */
+ @MBeanAttribute(name="Name", description= TYPE + " Name")
+ String getName() throws IOException;
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
new file mode 100755
index 0000000000..04f19b79bb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -0,0 +1,100 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.virtualhost;
+
+import java.util.UUID;
+
+import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
+
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
+{
+ IConnectionRegistry getConnectionRegistry();
+
+ VirtualHostConfiguration getConfiguration();
+
+ String getName();
+
+ QueueRegistry getQueueRegistry();
+
+ ExchangeRegistry getExchangeRegistry();
+
+ ExchangeFactory getExchangeFactory();
+
+ MessageStore getMessageStore();
+
+ TransactionLog getTransactionLog();
+
+ DurableConfigurationStore getDurableConfigurationStore();
+
+ AuthenticationManager getAuthenticationManager();
+
+ SecurityManager getSecurityManager();
+
+ void close();
+
+ ManagedObject getManagedObject();
+
+ UUID getBrokerId();
+
+ void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
+
+ long getHouseKeepingTaskCount();
+
+ public long getHouseKeepingCompletedTaskCount();
+
+ int getHouseKeepingPoolSize();
+
+ void setHouseKeepingPoolSize(int newSize);
+
+ int getHouseKeepingActiveCount();
+
+ IApplicationRegistry getApplicationRegistry();
+
+ BindingFactory getBindingFactory();
+
+ void createBrokerConnection(String transport,
+ String host,
+ int port,
+ String vhost,
+ boolean durable,
+ String authMechanism, String username, String password);
+
+ ConfigStore getConfigStore();
+
+ void removeBrokerConnection(BrokerLink brokerLink);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
new file mode 100755
index 0000000000..96a9ac729e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -0,0 +1,360 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
+ ConfigurationRecoveryHandler.QueueRecoveryHandler,
+ ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
+ ConfigurationRecoveryHandler.BindingRecoveryHandler,
+ MessageStoreRecoveryHandler,
+ MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
+ TransactionLogRecoveryHandler,
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
+
+
+ private final VirtualHost _virtualHost;
+
+ private MessageStoreLogSubject _logSubject;
+ private List<ProcessAction> _actions;
+
+ private MessageStore _store;
+ private TransactionLog _transactionLog;
+
+ private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
+ private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
+ private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
+
+
+
+ public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public QueueRecoveryHandler begin(MessageStore store)
+ {
+ _logSubject = new MessageStoreLogSubject(_virtualHost,store);
+ _store = store;
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
+
+ return this;
+ }
+
+ public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments)
+ {
+ try
+ {
+ AMQShortString queueNameShortString = new AMQShortString(queueName);
+
+ AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+ if (q == null)
+ {
+ q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost,
+ arguments);
+ _virtualHost.getQueueRegistry().registerQueue(q);
+ }
+
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
+
+ //Record that we have a queue for recovery
+ _queueRecoveries.put(queueName, 0);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ExchangeRecoveryHandler completeQueueRecovery()
+ {
+ return this;
+ }
+
+ public void exchange(String exchangeName, String type, boolean autoDelete)
+ {
+ try
+ {
+ Exchange exchange;
+ AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
+ exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
+ if (exchange == null)
+ {
+ exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ }
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ public BindingRecoveryHandler completeExchangeRecovery()
+ {
+ return this;
+ }
+
+ public StoredMessageRecoveryHandler begin()
+ {
+ // TODO - log begin
+ return this;
+ }
+
+ public void message(StoredMessage message)
+ {
+ ServerMessage serverMessage;
+ switch(message.getMetaData().getType())
+ {
+ case META_DATA_0_8:
+ serverMessage = new AMQMessage(message);
+ break;
+ case META_DATA_0_10:
+ serverMessage = new MessageTransferMessage(message, null);
+ break;
+ default:
+ throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
+ }
+
+ //_logger.debug("Recovered message with id " + serverMessage);
+
+
+ _recoveredMessages.put(message.getMessageNumber(), serverMessage);
+ _unusedMessages.put(message.getMessageNumber(), message);
+ }
+
+ public void completeMessageRecovery()
+ {
+ //TODO - log end
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
+ {
+ _transactionLog = log;
+ return this;
+ }
+
+ private static final class ProcessAction
+ {
+ private final AMQQueue _queue;
+ private final AMQMessage _message;
+
+ public ProcessAction(AMQQueue queue, AMQMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+ public void process()
+ {
+ try
+ {
+ _queue.enqueue(_message);
+ }
+ catch(AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
+ {
+ _actions = new ArrayList<ProcessAction>();
+ try
+ {
+ Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
+ if (exchange == null)
+ {
+ _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName);
+ return;
+ }
+
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName));
+ if (queue == null)
+ {
+ _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName);
+ }
+ else
+ {
+ FieldTable argumentsFT = null;
+ if(buf != null)
+ {
+ argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
+ }
+
+ BindingFactory bf = _virtualHost.getBindingFactory();
+
+ Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT);
+
+ if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
+ {
+
+ _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName
+ + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
+
+ bf.restoreBinding(bindingKey, queue, exchange, argumentMap);
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void completeBindingRecovery()
+ {
+ //return this;
+ }
+
+ public void complete()
+ {
+
+
+ }
+
+ public void queueEntry(final String queueName, long messageId)
+ {
+ AMQShortString queueNameShortString = new AMQShortString(queueName);
+
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+ try
+ {
+ if(queue != null)
+ {
+ ServerMessage message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString());
+ }
+
+ Integer count = _queueRecoveries.get(queueName);
+ if (count == null)
+ {
+ count = 0;
+ }
+
+ queue.enqueue(message);
+
+ _queueRecoveries.put(queueName, ++count);
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
+ TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ txn.dequeueMessage(queue, messageId);
+ txn.commitTranAsync();
+ }
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
+ TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ TransactionLogResource mockQueue =
+ new TransactionLogResource()
+ {
+
+ public String getResourceName()
+ {
+ return queueName;
+ }
+ };
+ txn.dequeueMessage(mockQueue, messageId);
+ txn.commitTranAsync();
+ }
+
+ }
+ catch(AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
+
+ }
+
+ public void completeQueueEntryRecovery()
+ {
+
+ for(StoredMessage m : _unusedMessages.values())
+ {
+ _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+ m.remove();
+ }
+
+ for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
+ {
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
+
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
+ }
+
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
new file mode 100644
index 0000000000..33c713c62a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -0,0 +1,876 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ExchangeConfiguration;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfigType;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.connection.ConnectionRegistry;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
+import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+
+public class VirtualHostImpl implements VirtualHost
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
+
+ private final String _name;
+
+ private ConnectionRegistry _connectionRegistry;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+ private final AuthenticationManager _authenticationManager;
+
+ private SecurityManager _securityManager;
+
+ private final ScheduledThreadPoolExecutor _houseKeepingTasks;
+ private final IApplicationRegistry _appRegistry;
+ private VirtualHostConfiguration _configuration;
+ private DurableConfigurationStore _durableConfigurationStore;
+ private BindingFactory _bindingFactory;
+ private BrokerConfig _broker;
+ private UUID _id;
+
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ private final long _createTime = System.currentTimeMillis();
+ private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
+ private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
+
+ public IConnectionRegistry getConnectionRegistry()
+ {
+ return _connectionRegistry;
+ }
+
+ public VirtualHostConfiguration getConfiguration()
+ {
+ return _configuration;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public VirtualHostConfigType getConfigType()
+ {
+ return VirtualHostConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getBroker();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ /**
+ * Virtual host JMX MBean class.
+ *
+ * This has some of the methods implemented from management intrerface for exchanges. Any
+ * implementaion of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return ObjectName.quote(_name);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public VirtualHostImpl getVirtualHost()
+ {
+ return VirtualHostImpl.this;
+ }
+ }
+
+ public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
+ {
+ this(appRegistry, hostConfig, null);
+ }
+
+
+ public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ {
+ this(ApplicationRegistry.getInstance(),hostConfig,store);
+ }
+
+ private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ {
+ if (hostConfig == null)
+ {
+ throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ }
+
+ _appRegistry = appRegistry;
+ _broker = _appRegistry.getBroker();
+ _configuration = hostConfig;
+ _name = _configuration.getName();
+
+ _id = _appRegistry.getConfigStore().createId();
+
+ CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
+
+ if (_name == null || _name.length() == 0)
+ {
+ throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
+ }
+
+ _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
+ _securityManager.configureHostPlugins(_configuration);
+
+ _virtualHostMBean = new VirtualHostMBean();
+
+ _connectionRegistry = new ConnectionRegistry();
+
+ _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount());
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeFactory.initialise(_configuration);
+
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ StartupRoutingTable configFileRT = new StartupRoutingTable();
+
+ _durableConfigurationStore = configFileRT;
+
+ // This needs to be after the RT has been defined as it creates the default durable exchanges.
+ _exchangeRegistry.initialise();
+
+ _bindingFactory = new BindingFactory(this);
+
+ initialiseModel(_configuration);
+
+ if (store != null)
+ {
+ _messageStore = store;
+ _durableConfigurationStore = store;
+ }
+ else
+ {
+ initialiseMessageStore(hostConfig);
+ }
+
+ _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+ initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+
+ initialiseStatistics();
+ }
+
+ private void initialiseHouseKeeping(long period)
+ {
+ /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
+ if (period != 0L)
+ {
+ class ExpiredMessagesTask extends HouseKeepingTask
+ {
+ public ExpiredMessagesTask(VirtualHost vhost)
+ {
+ super(vhost);
+ }
+
+ public void execute()
+ {
+ for (AMQQueue q : _queueRegistry.getQueues())
+ {
+ _logger.debug("Checking message status for queue: "
+ + q.getName());
+ try
+ {
+ q.checkMessageStatus();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for queue: "
+ + q.getNameShortString().toString(), e);
+ //Don't throw exceptions as this will stop the
+ // house keeping task from running.
+ }
+ }
+ for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ {
+ _logger.debug("Checking for long running open transactions on connection " + connection);
+ for (AMQSessionModel session : connection.getSessionModels())
+ {
+ _logger.debug("Checking for long running open transactions on session " + session);
+ try
+ {
+ session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
+ _configuration.getTransactionTimeoutOpenClose(),
+ _configuration.getTransactionTimeoutIdleWarn(),
+ _configuration.getTransactionTimeoutIdleClose());
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ }
+ }
+ }
+ }
+ }
+
+ scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
+
+ Map<String, VirtualHostPluginFactory> plugins =
+ ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
+
+ if (plugins != null)
+ {
+ for (Map.Entry<String, VirtualHostPluginFactory> entry : plugins.entrySet())
+ {
+ String pluginName = entry.getKey();
+ VirtualHostPluginFactory factory = entry.getValue();
+ try
+ {
+ VirtualHostPlugin plugin = factory.newInstance(this);
+
+ // If we had configuration for the plugin the schedule it.
+ if (plugin != null)
+ {
+ _houseKeepingTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2,
+ plugin.getDelay(), plugin.getTimeUnit());
+
+ _logger.info("Loaded VirtualHostPlugin:" + plugin);
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _logger.error("Unable to load VirtualHostPlugin:" + pluginName + " due to:" + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Allow other broker components to register a HouseKeepingTask
+ *
+ * @param period How often this task should run, in ms.
+ * @param task The task to run.
+ */
+ public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
+ {
+ _houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public long getHouseKeepingTaskCount()
+ {
+ return _houseKeepingTasks.getTaskCount();
+ }
+
+ public long getHouseKeepingCompletedTaskCount()
+ {
+ return _houseKeepingTasks.getCompletedTaskCount();
+ }
+
+ public int getHouseKeepingPoolSize()
+ {
+ return _houseKeepingTasks.getCorePoolSize();
+ }
+
+ public void setHouseKeepingPoolSize(int newSize)
+ {
+ _houseKeepingTasks.setCorePoolSize(newSize);
+ }
+
+
+ public int getHouseKeepingActiveCount()
+ {
+ return _houseKeepingTasks.getActiveCount();
+ }
+
+
+ private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ String messageStoreClass = hostConfig.getMessageStoreClass();
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ MessageStore messageStore = (MessageStore) o;
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+
+ MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
+
+ messageStore.configureConfigStore(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+
+ messageStore.configureMessageStore(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+ messageStore.configureTransactionLog(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+
+ _messageStore = messageStore;
+ _durableConfigurationStore = messageStore;
+ }
+
+ private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
+ {
+ _logger.debug("Loading configuration for virtualhost: " + config.getName());
+
+ List exchangeNames = config.getExchanges();
+
+ for (Object exchangeNameObj : exchangeNames)
+ {
+ String exchangeName = String.valueOf(exchangeNameObj);
+ configureExchange(config.getExchangeConfiguration(exchangeName));
+ }
+
+ String[] queueNames = config.getQueueNames();
+
+ for (Object queueNameObj : queueNames)
+ {
+ String queueName = String.valueOf(queueNameObj);
+ configureQueue(config.getQueueConfiguration(queueName));
+ }
+ }
+
+ private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
+ {
+ AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
+
+ Exchange exchange;
+ exchange = _exchangeRegistry.getExchange(exchangeName);
+ if (exchange == null)
+ {
+
+ AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
+ boolean durable = exchangeConfiguration.getDurable();
+ boolean autodelete = exchangeConfiguration.getAutoDelete();
+
+ Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
+ _exchangeRegistry.registerExchange(newExchange);
+
+ if (newExchange.isDurable())
+ {
+ _durableConfigurationStore.createExchange(newExchange);
+ }
+ }
+ }
+
+ private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
+ {
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+
+ if (queue.isDurable())
+ {
+ getDurableConfigurationStore().createQueue(queue);
+ }
+
+ String exchangeName = queueConfiguration.getExchange();
+
+ Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
+
+ if (exchange == null)
+ {
+ exchange = _exchangeRegistry.getDefaultExchange();
+ }
+
+ if (exchange == null)
+ {
+ throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
+ }
+
+ List routingKeys = queueConfiguration.getRoutingKeys();
+ if (routingKeys == null || routingKeys.isEmpty())
+ {
+ routingKeys = Collections.singletonList(queue.getNameShortString());
+ }
+
+ for (Object routingKeyNameObj : routingKeys)
+ {
+ AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
+ }
+ _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
+ }
+
+ if (exchange != _exchangeRegistry.getDefaultExchange())
+ {
+ _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null);
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public BrokerConfig getBroker()
+ {
+ return _broker;
+ }
+
+ public String getFederationTag()
+ {
+ return _broker.getFederationTag();
+ }
+
+ public void setBroker(final BrokerConfig broker)
+ {
+ _broker = broker;
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public TransactionLog getTransactionLog()
+ {
+ return _messageStore;
+ }
+
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _durableConfigurationStore;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public SecurityManager getSecurityManager()
+ {
+ return _securityManager;
+ }
+
+ public void close()
+ {
+ //Stop Connections
+ _connectionRegistry.close();
+
+ //Stop the Queues processing
+ if (_queueRegistry != null)
+ {
+ for (AMQQueue queue : _queueRegistry.getQueues())
+ {
+ queue.stop();
+ }
+ }
+
+ //Stop Housekeeping
+ if (_houseKeepingTasks != null)
+ {
+ _houseKeepingTasks.shutdown();
+
+ try
+ {
+ if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
+ {
+ _houseKeepingTasks.shutdownNow();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted during Housekeeping shutdown:" + e.getMessage());
+ // Swallowing InterruptedException ok as we are shutting down.
+ }
+ }
+
+ //Close MessageStore
+ if (_messageStore != null)
+ {
+ //Remove MessageStore Interface should not throw Exception
+ try
+ {
+ _messageStore.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ CurrentActor.get().message(VirtualHostMessages.CLOSED());
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+
+ public UUID getBrokerId()
+ {
+ return _appRegistry.getBrokerId();
+ }
+
+ public IApplicationRegistry getApplicationRegistry()
+ {
+ return _appRegistry;
+ }
+
+ public BindingFactory getBindingFactory()
+ {
+ return _bindingFactory;
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _appRegistry.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _appRegistry.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+
+ for (AMQConnectionModel connection : _connectionRegistry.getConnections())
+ {
+ connection.resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+ _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+ _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
+
+ public void createBrokerConnection(final String transport,
+ final String host,
+ final int port,
+ final String vhost,
+ final boolean durable,
+ final String authMechanism,
+ final String username,
+ final String password)
+ {
+ BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password);
+ if(_links.putIfAbsent(blink,blink) != null)
+ {
+ getConfigStore().addConfiguredObject(blink);
+ }
+ }
+
+ public void removeBrokerConnection(final String transport,
+ final String host,
+ final int port,
+ final String vhost)
+ {
+ removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null));
+
+ }
+
+ public void removeBrokerConnection(BrokerLink blink)
+ {
+ blink = _links.get(blink);
+ if(blink != null)
+ {
+ blink.close();
+ getConfigStore().removeConfiguredObject(blink);
+ }
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return getApplicationRegistry().getConfigStore();
+ }
+
+ /**
+ * Temporary Startup RT class to record the creation of persistent queues / exchanges.
+ *
+ *
+ * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
+ * This should be removed after the _RT has been fully split from the the TL
+ */
+ private static class StartupRoutingTable implements DurableConfigurationStore
+ {
+ public List<Exchange> exchange = new LinkedList<Exchange>();
+ public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
+ public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
+
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void removeMessage(Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ if (exchange.isDurable())
+ {
+ this.exchange.add(exchange);
+ }
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ if (exchange.isDurable() && queue.isDurable())
+ {
+ bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args));
+ }
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ createQueue(queue, null);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ if (queue.isDurable())
+ {
+ this.queue.add(new CreateQueueTuple(queue, arguments));
+ }
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+
+ private static class CreateQueueTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+
+ public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
+ {
+ this.queue = queue;
+ this.arguments = arguments;
+ }
+ }
+
+ private static class CreateBindingTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+ public Exchange exchange;
+ public AMQShortString routingKey;
+
+ public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ {
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.queue = queue;
+ arguments = args;
+ }
+ }
+
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return _name;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
new file mode 100644
index 0000000000..32d0c4c4d1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.ConfigStore;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry implements Closeable
+{
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
+
+
+ private String _defaultVirtualHostName;
+ private ApplicationRegistry _applicationRegistry;
+
+ public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+ }
+
+ public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+ {
+ if(_registry.containsKey(host.getName()))
+ {
+ throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+ }
+ _registry.put(host.getName(),host);
+ }
+
+ public synchronized void unregisterVirtualHost(VirtualHost host)
+ {
+ _registry.remove(host.getName());
+ }
+
+ public VirtualHost getVirtualHost(String name)
+ {
+ if(name == null || name.trim().length() == 0 || "/".equals(name.trim()))
+ {
+ name = getDefaultVirtualHostName();
+ }
+
+ return _registry.get(name);
+ }
+
+ public VirtualHost getDefaultVirtualHost()
+ {
+ return getVirtualHost(getDefaultVirtualHostName());
+ }
+
+ private String getDefaultVirtualHostName()
+ {
+ return _defaultVirtualHostName;
+ }
+
+ public void setDefaultVirtualHostName(String defaultVirtualHostName)
+ {
+ _defaultVirtualHostName = defaultVirtualHostName;
+ }
+
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ return new ArrayList<VirtualHost>(_registry.values());
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ return _applicationRegistry;
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return _applicationRegistry.getConfigStore();
+ }
+
+ public void close()
+ {
+ for (VirtualHost virtualHost : getVirtualHosts())
+ {
+ virtualHost.close();
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
new file mode 100644
index 0000000000..12206013eb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.Exchange.BindingListener;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * This is a listener that caches queues that are configured for slow consumer disconnection.
+ *
+ * There should be one listener per virtual host, which can be added to all exchanges on
+ * that host.
+ *
+ * TODO In future, it will be possible to configure the policy at runtime, so only the queue
+ * itself is cached, and the configuration looked up by the housekeeping thread. This means
+ * that there may be occasions where the copy of the cache contents retrieved by the thread
+ * does not contain queues that are configured, or that configured queues are not present.
+ *
+ * @see BindingListener
+ */
+public class ConfiguredQueueBindingListener implements BindingListener
+{
+ private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class);
+
+ private String _vhostName;
+ private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>());
+
+ public ConfiguredQueueBindingListener(String vhostName)
+ {
+ _vhostName = vhostName;
+ }
+
+ /**
+ * @see BindingListener#bindingAdded(Exchange, Binding)
+ */
+ public void bindingAdded(Exchange exchange, Binding binding)
+ {
+ processBinding(binding);
+ }
+
+ /**
+ * @see BindingListener#bindingRemoved(Exchange, Binding)
+ */
+ public void bindingRemoved(Exchange exchange, Binding binding)
+ {
+ processBinding(binding);
+ }
+
+ private void processBinding(Binding binding)
+ {
+ AMQQueue queue = binding.getQueue();
+
+ SlowConsumerDetectionQueueConfiguration config =
+ queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+ if (config != null)
+ {
+ _cache.add(queue);
+ }
+ else
+ {
+ _cache.remove(queue);
+ }
+ }
+
+ /**
+ * Lookup and return the cache of configured {@link AMQQueue}s.
+ *
+ * Note that when accessing the cached queues, the {@link Iterator} is not thread safe
+ * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the
+ * cache is returned.
+ *
+ * @return a copy of the cached {@link java.util.Set} of queues
+ */
+ public Set<AMQQueue> getQueueCache()
+ {
+ return new HashSet<AMQQueue>(_cache);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
new file mode 100644
index 0000000000..5c4fe0aab8
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
+import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.plugins.Plugin;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
+
+public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
+{
+ private SlowConsumerDetectionConfiguration _config;
+ private ConfiguredQueueBindingListener _listener;
+
+ public static class SlowConsumerFactory implements VirtualHostPluginFactory
+ {
+ public SlowConsumerDetection newInstance(VirtualHost vhost)
+ {
+ SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());
+
+ if (config == null)
+ {
+ return null;
+ }
+
+ SlowConsumerDetection plugin = new SlowConsumerDetection(vhost);
+ plugin.configure(config);
+ return plugin;
+ }
+ }
+
+ /**
+ * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this
+ * cirtual host to record all the configured queues in a cache for processing by the housekeeping
+ * thread.
+ *
+ * @see Plugin#configure(ConfigurationPlugin)
+ */
+ public void configure(ConfigurationPlugin config)
+ {
+ _config = (SlowConsumerDetectionConfiguration) config;
+ _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName());
+ for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames())
+ {
+ getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
+ }
+ }
+
+ public SlowConsumerDetection(VirtualHost vhost)
+ {
+ super(vhost);
+ }
+
+ public void execute()
+ {
+ CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
+
+ Set<AMQQueue> cache = _listener.getQueueCache();
+ for (AMQQueue q : cache)
+ {
+ CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()));
+
+ try
+ {
+ SlowConsumerDetectionQueueConfiguration config =
+ q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+ if (checkQueueStatus(q, config))
+ {
+ config.getPolicy().performPolicy(q);
+ }
+ }
+ catch (Exception e)
+ {
+ // Don't throw exceptions as this will stop the house keeping task from running.
+ _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e);
+ }
+ }
+
+ CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
+ }
+
+ public long getDelay()
+ {
+ return _config.getDelay();
+ }
+
+ public TimeUnit getTimeUnit()
+ {
+ return _config.getTimeUnit();
+ }
+
+ /**
+ * Check the depth,messageSize,messageAge,messageCount values for this q
+ *
+ * @param q the queue to check
+ * @param config the queue configuration to compare against the queue state
+ *
+ * @return true if the queue has reached a threshold.
+ */
+ private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
+ {
+ if (config != null)
+ {
+ _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+
+ int count = q.getMessageCount();
+
+ // First Check message counts
+ if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) ||
+ // The check queue depth
+ (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
+ // finally if we have messages on the queue check Arrival time.
+ // We must check count as OldestArrival time is Long.MAX_LONG when
+ // there are no messages.
+ (config.getMessageAge() != 0 &&
+ ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge())))
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")");
+ _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount());
+ _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth());
+ _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
+ }
+
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java
new file mode 100644
index 0000000000..3798f47f0b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.concurrent.TimeUnit;
+
+public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask implements VirtualHostPlugin
+{
+ protected final Logger _logger = Logger.getLogger(getClass());
+
+ public VirtualHostHouseKeepingPlugin(VirtualHost vhost)
+ {
+ super(vhost);
+ }
+
+
+ /**
+ * Long value representing the delay between repeats
+ *
+ * @return
+ */
+ public abstract long getDelay();
+
+ /**
+ * Option to specify what the delay value represents
+ *
+ * @return
+ *
+ * @see java.util.concurrent.TimeUnit for valid value.
+ */
+ public abstract TimeUnit getTimeUnit();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java
new file mode 100644
index 0000000000..1886c2d01d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.server.plugins.Plugin;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public interface VirtualHostPlugin extends Runnable, Plugin
+{
+ /**
+ * Long value representing the delay between repeats
+ *
+ * @return
+ */
+ public long getDelay();
+
+ /**
+ * Option to specify what the delay value represents
+ * @see java.util.concurrent.TimeUnit for valid value.
+ * @return
+ */
+ public TimeUnit getTimeUnit();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java
new file mode 100644
index 0000000000..c8bea18444
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public interface VirtualHostPluginFactory
+{
+ public VirtualHostHouseKeepingPlugin newInstance(VirtualHost vhost);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties
new file mode 100644
index 0000000000..03c56910c2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/SlowConsumerDetection_logmessages.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Default File used for all non-defined locales.
+
+RUNNING = SCD-1001 : Running
+COMPLETE = SCD-1002 : Complete
+CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties
new file mode 100644
index 0000000000..ed4fb1d45a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/logging/TopicDeletePolicy_logmessages.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Default File used for all non-defined locales.
+
+DELETING_QUEUE = TDP-1001 : Deleting Queue
+DISCONNECTING = TDP-1002 : Disconnecting Session \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java
new file mode 100644
index 0000000000..6028f63fdb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins.policies;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.plugins.logging.TopicDeletePolicyMessages;
+import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
+import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
+
+public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
+{
+ Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
+ private TopicDeletePolicyConfiguration _configuration;
+
+ public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory
+ {
+ public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException
+ {
+ TopicDeletePolicyConfiguration config =
+ configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName());
+
+ TopicDeletePolicy policy = new TopicDeletePolicy();
+ policy.configure(config);
+ return policy;
+ }
+
+ public String getPluginName()
+ {
+ return "topicdelete";
+ }
+
+ public Class<TopicDeletePolicy> getPluginClass()
+ {
+ return TopicDeletePolicy.class;
+ }
+ }
+
+ public void performPolicy(AMQQueue q)
+ {
+ if (q == null)
+ {
+ return;
+ }
+
+ AMQSessionModel owner = q.getExclusiveOwningSession();
+
+ // Only process exclusive queues
+ if (owner == null)
+ {
+ return;
+ }
+
+ //Only process Topics
+ if (!validateQueueIsATopic(q))
+ {
+ return;
+ }
+
+ try
+ {
+ CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING());
+ // Close the consumer . this will cause autoDelete Queues to be purged
+ owner.getConnectionModel().
+ closeSession(owner, AMQConstant.RESOURCE_ERROR,
+ "Consuming to slow.");
+
+ // Actively delete non autoDelete queues if deletePersistent is set
+ if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent()))
+ {
+ CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE());
+ q.delete();
+ }
+
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName());
+ }
+
+ }
+
+ /**
+ * Check the queue bindings to validate the queue is bound to the
+ * topic exchange.
+ *
+ * @param q the Queue
+ *
+ * @return true iff Q is bound to a TopicExchange
+ */
+ private boolean validateQueueIsATopic(AMQQueue q)
+ {
+ for (Binding binding : q.getBindings())
+ {
+ if (binding.getExchange() instanceof TopicExchange)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void configure(ConfigurationPlugin config)
+ {
+ _configuration = (TopicDeletePolicyConfiguration) config;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java
new file mode 100644
index 0000000000..7dfd22c733
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins.policies;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+
+public class TopicDeletePolicyConfiguration extends ConfigurationPlugin
+{
+
+ public static class TopicDeletePolicyConfigurationFactory
+ implements ConfigurationPluginFactory
+ {
+ public ConfigurationPlugin newInstance(String path,
+ Configuration config)
+ throws ConfigurationException
+ {
+ TopicDeletePolicyConfiguration slowConsumerConfig =
+ new TopicDeletePolicyConfiguration();
+ slowConsumerConfig.setConfiguration(path, config);
+ return slowConsumerConfig;
+ }
+
+ public List<String> getParentPaths()
+ {
+ return Arrays.asList(
+ "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete",
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete",
+ "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete",
+ "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete");
+ }
+ }
+
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"delete-persistent"};
+ }
+
+ @Override
+ public void validateConfiguration() throws ConfigurationException
+ {
+ // No validation required.
+ }
+
+ public boolean deletePersistent()
+ {
+ // If we don't have configuration then we don't deletePersistent Queues
+ return (hasConfiguration() && contains("delete-persistent"));
+ }
+
+ @Override
+ public String formatToString()
+ {
+ return (deletePersistent()?"delete-durable":"");
+ }
+
+
+}