diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 5859ce3c68..9a23e00f90 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -47,8 +47,13 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageNode; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; @@ -99,6 +104,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private final DtxRegistry _dtxRegistry; private final AMQQueueFactory _queueFactory; + private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry(); private volatile State _state = State.INITIALISING; @@ -107,6 +113,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); private boolean _blocked; + private final Map<String, MessageDestination> _systemNodeDestinations = + Collections.synchronizedMap(new HashMap<String,MessageDestination>()); + + private final Map<String, MessageSource> _systemNodeSources = + Collections.synchronizedMap(new HashMap<String,MessageSource>()); + + public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, @@ -149,6 +162,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry); + registerSystemNodes(); + initialiseStatistics(); initialiseStorage(hostConfig, virtualHost); @@ -157,6 +172,16 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); } + private void registerSystemNodes() + { + QpidServiceLoader<SystemNodeCreator> qpidServiceLoader = new QpidServiceLoader<SystemNodeCreator>(); + Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class); + for(SystemNodeCreator creator : factories) + { + creator.register(_systemNodeRegistry); + } + } + abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig, org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception; @@ -441,6 +466,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } @Override + public MessageSource getMessageSource(final String name) + { + MessageSource systemSource = _systemNodeSources.get(name); + return systemSource == null ? getQueue(name) : systemSource; + } + + @Override public AMQQueue getQueue(UUID id) { return _queueRegistry.getQueue(id); @@ -524,6 +556,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } + + @Override + public MessageDestination getMessageDestination(final String name) + { + MessageDestination destination = _systemNodeDestinations.get(name); + return destination == null ? getExchange(name) : destination; + } + @Override public Exchange getExchange(String name) { @@ -927,4 +967,39 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } } } + + private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry + { + @Override + public void registerSystemNode(final MessageNode node) + { + if(node instanceof MessageDestination) + { + _systemNodeDestinations.put(node.getName(), (MessageDestination) node); + } + if(node instanceof MessageSource) + { + _systemNodeSources.put(node.getName(), (MessageSource)node); + } + } + + @Override + public void removeSystemNode(final MessageNode node) + { + if(node instanceof MessageDestination) + { + _systemNodeDestinations.remove(node.getName()); + } + if(node instanceof MessageSource) + { + _systemNodeSources.remove(node.getName()); + } + } + + @Override + public VirtualHost getVirtualHost() + { + return AbstractVirtualHost.this; + } + } } |