summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-05 16:37:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-05 16:37:22 +0000
commit00b7d74269454f15cbe90d790e653887c67bc7cb (patch)
tree2ee8f381bebbfa166f6bd1c28b721d9e9d60f044
parent40f30a00fba4e838c903163054bb4f80f79882e2 (diff)
downloadqpid-python-00b7d74269454f15cbe90d790e653887c67bc7cb.tar.gz
Allow plugins to define system nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564825 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java26
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java34
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java57
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
6 files changed, 119 insertions, 4 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index c6eb8b2a2b..78e2ab9a15 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.message;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-public interface MessageDestination
+public interface MessageDestination extends MessageNode
{
public String getName();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
new file mode 100644
index 0000000000..f4b751d2fd
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface MessageNode
+{
+ String getName();
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index 1abe3671ff..c29b9786df 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -32,7 +32,7 @@ import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.EnumSet;
-public interface MessageSource extends TransactionLogResource
+public interface MessageSource extends TransactionLogResource, MessageNode
{
Consumer addConsumer(ConsumerTarget target, FilterManager filters,
Class<? extends ServerMessage> messageClass,
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java b/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
new file mode 100644
index 0000000000..219332a413
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import org.apache.qpid.server.message.MessageNode;
+
+public interface SystemNodeCreator extends Pluggable
+{
+ interface SystemNodeRegistry
+ {
+ void registerSystemNode(MessageNode node);
+ void removeSystemNode(MessageNode node);
+ }
+
+ void register(SystemNodeRegistry registry);
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 99b7407bde..948afea4a3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -48,9 +48,12 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
@@ -101,6 +104,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final DtxRegistry _dtxRegistry;
private final AMQQueueFactory _queueFactory;
+ private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
private volatile State _state = State.INITIALISING;
@@ -109,6 +113,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
private boolean _blocked;
+ private final Map<String, MessageDestination> _systemNodeDestinations =
+ Collections.synchronizedMap(new HashMap<String,MessageDestination>());
+
+ private final Map<String, MessageSource> _systemNodeSources =
+ Collections.synchronizedMap(new HashMap<String,MessageSource>());
+
+
public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
SecurityManager parentSecurityManager,
@@ -151,6 +162,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
+ registerSystemNodes();
+
initialiseStatistics();
initialiseStorage(hostConfig, virtualHost);
@@ -159,6 +172,16 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
+ private void registerSystemNodes()
+ {
+ QpidServiceLoader<SystemNodeCreator> qpidServiceLoader = new QpidServiceLoader<SystemNodeCreator>();
+ Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
+ for(SystemNodeCreator creator : factories)
+ {
+ creator.register(_systemNodeRegistry);
+ }
+ }
+
abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception;
@@ -445,7 +468,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
@Override
public MessageSource getMessageSource(final String name)
{
- return getQueue(name);
+ MessageSource systemSource = _systemNodeSources.get(name);
+ return systemSource == null ? getQueue(name) : systemSource;
}
@Override
@@ -536,7 +560,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
@Override
public MessageDestination getMessageDestination(final String name)
{
- return getExchange(name);
+ MessageDestination destination = _systemNodeDestinations.get(name);
+ return destination == null ? getExchange(name) : destination;
}
@Override
@@ -942,4 +967,32 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
}
}
+
+ private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
+ {
+ @Override
+ public void registerSystemNode(final MessageNode node)
+ {
+ if(node instanceof MessageDestination)
+ {
+ _systemNodeDestinations.put(node.getName(), (MessageDestination) node);
+ }
+ if(node instanceof MessageSource)
+ {
+ _systemNodeSources.put(node.getName(), (MessageSource)node);
+ }
+ }
+
+ public void removeSystemNode(final MessageNode node)
+ {
+ if(node instanceof MessageDestination)
+ {
+ _systemNodeDestinations.remove(node.getName());
+ }
+ if(node instanceof MessageSource)
+ {
+ _systemNodeSources.remove(node.getName());
+ }
+ }
+ }
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 0c1b949e62..7034311d84 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
@@ -80,6 +81,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
void removeExchange(Exchange exchange, boolean force) throws AMQException;
MessageDestination getMessageDestination(String name);
+
Exchange getExchange(String name);
Exchange getExchange(UUID id);