summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java75
1 files changed, 75 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 5859ce3c68..9a23e00f90 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -47,8 +47,13 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
@@ -99,6 +104,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final DtxRegistry _dtxRegistry;
private final AMQQueueFactory _queueFactory;
+ private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
private volatile State _state = State.INITIALISING;
@@ -107,6 +113,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
private boolean _blocked;
+ private final Map<String, MessageDestination> _systemNodeDestinations =
+ Collections.synchronizedMap(new HashMap<String,MessageDestination>());
+
+ private final Map<String, MessageSource> _systemNodeSources =
+ Collections.synchronizedMap(new HashMap<String,MessageSource>());
+
+
public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
SecurityManager parentSecurityManager,
@@ -149,6 +162,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
+ registerSystemNodes();
+
initialiseStatistics();
initialiseStorage(hostConfig, virtualHost);
@@ -157,6 +172,16 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
+ private void registerSystemNodes()
+ {
+ QpidServiceLoader<SystemNodeCreator> qpidServiceLoader = new QpidServiceLoader<SystemNodeCreator>();
+ Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
+ for(SystemNodeCreator creator : factories)
+ {
+ creator.register(_systemNodeRegistry);
+ }
+ }
+
abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception;
@@ -441,6 +466,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
@Override
+ public MessageSource getMessageSource(final String name)
+ {
+ MessageSource systemSource = _systemNodeSources.get(name);
+ return systemSource == null ? getQueue(name) : systemSource;
+ }
+
+ @Override
public AMQQueue getQueue(UUID id)
{
return _queueRegistry.getQueue(id);
@@ -524,6 +556,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
+
+ @Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ MessageDestination destination = _systemNodeDestinations.get(name);
+ return destination == null ? getExchange(name) : destination;
+ }
+
@Override
public Exchange getExchange(String name)
{
@@ -927,4 +967,39 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
}
}
+
+ private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
+ {
+ @Override
+ public void registerSystemNode(final MessageNode node)
+ {
+ if(node instanceof MessageDestination)
+ {
+ _systemNodeDestinations.put(node.getName(), (MessageDestination) node);
+ }
+ if(node instanceof MessageSource)
+ {
+ _systemNodeSources.put(node.getName(), (MessageSource)node);
+ }
+ }
+
+ @Override
+ public void removeSystemNode(final MessageNode node)
+ {
+ if(node instanceof MessageDestination)
+ {
+ _systemNodeDestinations.remove(node.getName());
+ }
+ if(node instanceof MessageSource)
+ {
+ _systemNodeSources.remove(node.getName());
+ }
+ }
+
+ @Override
+ public VirtualHost getVirtualHost()
+ {
+ return AbstractVirtualHost.this;
+ }
+ }
}