summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java173
1 files changed, 32 insertions, 141 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index d9dc0aa64e..dd3610373f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -35,12 +34,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.BrokerConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfigType;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -49,7 +44,6 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
@@ -61,17 +55,16 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreCreator;
import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
@@ -79,23 +72,19 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
- private final UUID _qmfId;
-
private final String _name;
private final UUID _id;
private final long _createTime = System.currentTimeMillis();
- private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
-
private final ScheduledThreadPoolExecutor _houseKeepingTasks;
- private final IApplicationRegistry _appRegistry;
+ private final VirtualHostRegistry _virtualHostRegistry;
- private final SecurityManager _securityManager;
+ private final StatisticsGatherer _brokerStatisticsGatherer;
- private final BrokerConfig _brokerConfig;
+ private final SecurityManager _securityManager;
private final VirtualHostConfiguration _vhostConfig;
@@ -120,7 +109,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
private boolean _blocked;
- public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
+ public VirtualHostImpl(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, VirtualHostConfiguration hostConfig) throws Exception
{
if (hostConfig == null)
{
@@ -132,19 +121,17 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost.");
}
- _appRegistry = appRegistry;
- _brokerConfig = _appRegistry.getBrokerConfig();
+ _virtualHostRegistry = virtualHostRegistry;
+ _brokerStatisticsGatherer = brokerStatisticsGatherer;
_vhostConfig = hostConfig;
_name = _vhostConfig.getName();
_dtxRegistry = new DtxRegistry();
- _qmfId = _appRegistry.getConfigStore().createId();
_id = UUIDGenerator.generateVhostUUID(_name);
CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
- _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
- _securityManager.configureHostPlugins(_vhostConfig);
+ _securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl"));
_connectionRegistry = new ConnectionRegistry();
_connectionRegistry.addRegistryChangeListener(this);
@@ -154,13 +141,12 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
_queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(_vhostConfig);
_exchangeRegistry = new DefaultExchangeRegistry(this);
_bindingFactory = new BindingFactory(this);
- _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass());
+ _messageStore = initialiseMessageStore(hostConfig);
configureMessageStore(hostConfig);
@@ -187,22 +173,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
return _id;
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
- public VirtualHostConfigType getConfigType()
- {
- return VirtualHostConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getBroker();
- }
-
public boolean isDurable()
{
return false;
@@ -216,38 +186,9 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
*/
private void initialiseHouseKeeping(long period)
{
-
if (period != 0L)
{
scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
-
- Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins();
-
- if (plugins != null)
- {
- for (Map.Entry<String, VirtualHostPluginFactory> entry : plugins.entrySet())
- {
- String pluginName = entry.getKey();
- VirtualHostPluginFactory factory = entry.getValue();
- try
- {
- VirtualHostPlugin plugin = factory.newInstance(this);
-
- // If we had configuration for the plugin the schedule it.
- if (plugin != null)
- {
- _houseKeepingTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2,
- plugin.getDelay(), plugin.getTimeUnit());
-
- _logger.info("Loaded VirtualHostPlugin:" + plugin);
- }
- }
- catch (RuntimeException e)
- {
- _logger.error("Unable to load VirtualHostPlugin:" + pluginName + " due to:" + e.getMessage(), e);
- }
- }
- }
}
}
@@ -329,19 +270,34 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
if (!(o instanceof MessageStore))
{
- throw new ClassCastException("Message store factory class must implement " + MessageStore.class +
+ throw new ClassCastException("Message store class must implement " + MessageStore.class +
". Class " + clazz + " does not.");
}
final MessageStore messageStore = (MessageStore) o;
- final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName());
+ return messageStore;
+ }
+
+ private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ String storeType = hostConfig.getConfig().getString("store.type");
+ MessageStore messageStore = null;
+ if (storeType == null)
+ {
+ messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass());
+ }
+ else
+ {
+ messageStore = new MessageStoreCreator().createMessageStore(storeType);
+ }
+
+ final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName());
OperationalLoggingListener.listen(messageStore, storeLogSubject);
messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE);
messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
-
return messageStore;
}
@@ -468,16 +424,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
return _name;
}
- public BrokerConfig getBroker()
- {
- return _brokerConfig;
- }
-
- public String getFederationTag()
- {
- return _brokerConfig.getFederationTag();
- }
-
public long getCreateTime()
{
return _createTime;
@@ -534,14 +480,9 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
CurrentActor.get().message(VirtualHostMessages.CLOSED());
}
- public UUID getBrokerId()
- {
- return _appRegistry.getBrokerId();
- }
-
- public IApplicationRegistry getApplicationRegistry()
+ public VirtualHostRegistry getVirtualHostRegistry()
{
- return _appRegistry;
+ return _virtualHostRegistry;
}
public BindingFactory getBindingFactory()
@@ -553,14 +494,14 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
{
_messagesDelivered.registerEvent(1L);
_dataDelivered.registerEvent(messageSize);
- _appRegistry.registerMessageDelivered(messageSize);
+ _brokerStatisticsGatherer.registerMessageDelivered(messageSize);
}
public void registerMessageReceived(long messageSize, long timestamp)
{
_messagesReceived.registerEvent(1L, timestamp);
_dataReceived.registerEvent(messageSize, timestamp);
- _appRegistry.registerMessageReceived(messageSize, timestamp);
+ _brokerStatisticsGatherer.registerMessageReceived(messageSize, timestamp);
}
public StatisticsCounter getMessageReceiptStatistics()
@@ -604,51 +545,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
_dataReceived = new StatisticsCounter("bytes-received-" + getName());
}
- public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments)
- {
- BrokerLink blink = new BrokerLink(this, id, createTime, arguments);
- // TODO - cope with duplicate broker link creation requests
- _links.putIfAbsent(blink,blink);
- getConfigStore().addConfiguredObject(blink);
- return blink;
- }
-
- public void createBrokerConnection(final String transport,
- final String host,
- final int port,
- final String vhost,
- final boolean durable,
- final String authMechanism,
- final String username,
- final String password)
- {
- BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password);
-
- // TODO - cope with duplicate broker link creation requests
- _links.putIfAbsent(blink,blink);
- getConfigStore().addConfiguredObject(blink);
-
- }
-
- public void removeBrokerConnection(final String transport,
- final String host,
- final int port,
- final String vhost)
- {
- removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null));
-
- }
-
- public void removeBrokerConnection(BrokerLink blink)
- {
- blink = _links.get(blink);
- if(blink != null)
- {
- blink.close();
- getConfigStore().removeConfiguredObject(blink);
- }
- }
-
public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
{
LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
@@ -660,11 +556,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
return linkRegistry;
}
- public ConfigStore getConfigStore()
- {
- return getApplicationRegistry().getConfigStore();
- }
-
public DtxRegistry getDtxRegistry()
{
return _dtxRegistry;