diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-08-06 13:06:35 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-08-06 13:06:35 +0000 |
commit | cebc6af8f10ab4058de3065a8948f32da8d85811 (patch) | |
tree | 82dd84f571e7f32c79f3adbb813d3be4ddc3bc53 | |
parent | 7608aa32f97193b2c329c5ea4c9a51f77b6f2fc0 (diff) | |
download | qpid-python-cebc6af8f10ab4058de3065a8948f32da8d85811.tar.gz |
QPID-543 : Add ability to register cusom exchange types
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563125 13f79535-47bb-0310-9956-ffa450edef68
15 files changed, 205 insertions, 47 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 868ac31a54..246de230ec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -49,7 +49,7 @@ public abstract class AbstractExchange implements Exchange, Managable protected boolean _durable; protected String _exchangeType; - protected int _ticket; + private VirtualHost _virtualHost; @@ -114,11 +114,6 @@ public abstract class AbstractExchange implements Exchange, Managable return _exchangeType; } - public Integer getTicketNo() - { - return _ticket; - } - public boolean isDurable() { return _durable; @@ -155,13 +150,12 @@ public abstract class AbstractExchange implements Exchange, Managable */ protected abstract ExchangeMBean createMBean() throws AMQException; - public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException + public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete) throws AMQException { _virtualHost = host; _name = name; _durable = durable; _autoDelete = autoDelete; - _ticket = ticket; _exchangeMbean = createMBean(); _exchangeMbean.register(); } @@ -176,11 +170,6 @@ public abstract class AbstractExchange implements Exchange, Managable return _autoDelete; } - public int getTicket() - { - return _ticket; - } - public void close() throws AMQException { if (_exchangeMbean != null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 39a5bba8b7..e3b715d45f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; +import org.apache.commons.configuration.Configuration; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.exchange.ExchangeDefaults; @@ -34,42 +36,66 @@ public class DefaultExchangeFactory implements ExchangeFactory { private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); - private Map<AMQShortString, Class<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, Class<? extends Exchange>>(); + private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>(); private final VirtualHost _host; public DefaultExchangeFactory(VirtualHost host) { _host = host; - _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); - _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); - _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); - _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class); + registerExchangeType(DestNameExchange.TYPE); + registerExchangeType(DestWildExchange.TYPE); + registerExchangeType(HeadersExchange.TYPE); + registerExchangeType(FanoutExchange.TYPE); + + } + public void registerExchangeType(ExchangeType<? extends Exchange> type) + { + _exchangeClassMap.put(type.getName(), type); } public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, int ticket) throws AMQException { - Class<? extends Exchange> exchClass = _exchangeClassMap.get(type); - if (exchClass == null) + ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type); + if (exchType == null) { throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null); } - try - { - Exchange e = exchClass.newInstance(); - e.initialise(_host, exchange, durable, ticket, autoDelete); - return e; - } - catch (InstantiationException e) - { - throw new AMQException(null, "Unable to create exchange: " + e, e); - } - catch (IllegalAccessException e) + Exchange e = exchType.newInstance(_host, exchange, durable, autoDelete); + return e; + } + + public void initialise(Configuration hostConfig) + { + for(Object className : hostConfig.getList("custom-exchanges.class-name")) { - throw new AMQException(null, "Unable to create exchange: " + e, e); + try + { + Class<? extends ExchangeType> exchangeTypeClass = (Class<? extends ExchangeType>) Class.forName(String.valueOf(className)); + ExchangeType type = exchangeTypeClass.newInstance(); + registerExchangeType(type); + + } + catch (ClassNotFoundException e) + { + _logger.error("No such custom exchange class found: \""+String.valueOf(className)+"\""); + } + catch (ClassCastException classCastEx) + { + _logger.error("No custom exchange class: \""+String.valueOf(className)+"\" cannot be registered as it does not extend class \""+ExchangeType.class+"\""); + } + catch (IllegalAccessException e) + { + _logger.error("Cannot create custom exchange class: \""+String.valueOf(className)+"\"",e); + } + catch (InstantiationException e) + { + _logger.error("Cannot create custom exchange class: \""+String.valueOf(className)+"\"",e); + } } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 0dcceaddbb..6177980b92 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; public class DestNameExchange extends AbstractExchange { @@ -58,6 +59,30 @@ public class DestNameExchange extends AbstractExchange */ private final Index _index = new Index(); + public static final ExchangeType<DestNameExchange> TYPE = new ExchangeType<DestNameExchange>() + { + + public AMQShortString getName() + { + return ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + } + + public Class<DestNameExchange> getExchangeClass() + { + return DestNameExchange.class; + } + + public DestNameExchange newInstance(VirtualHost host, + AMQShortString name, + boolean durable, + boolean autoDelete) throws AMQException + { + DestNameExchange exch = new DestNameExchange(); + exch.initialise(host,name,durable,autoDelete); + return exch; + } + }; + /** * MBean class implementing the management interfaces. */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index f6a95b5e55..20f0517789 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import javax.management.MBeanException; @@ -56,6 +57,32 @@ import java.util.concurrent.CopyOnWriteArrayList; public class DestWildExchange extends AbstractExchange { + + public static final ExchangeType<DestWildExchange> TYPE = new ExchangeType<DestWildExchange>() + { + + public AMQShortString getName() + { + return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } + + public Class<DestWildExchange> getExchangeClass() + { + return DestWildExchange.class; + } + + public DestWildExchange newInstance(VirtualHost host, + AMQShortString name, + boolean durable, + boolean autoDelete) throws AMQException + { + DestWildExchange exch = new DestWildExchange(); + exch.initialise(host,name,durable,autoDelete); + return exch; + } + }; + + private static final Logger _logger = Logger.getLogger(DestWildExchange.class); private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index a5f77cc2a4..03b264f8fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -32,7 +32,7 @@ public interface Exchange AMQShortString getName(); AMQShortString getType(); - void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; + boolean isDurable(); @@ -41,8 +41,6 @@ public interface Exchange */ boolean isAutoDelete(); - int getTicket(); - void close() throws AMQException; void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index e07fd0b8fc..b7b88b9157 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.exchange; +import org.apache.commons.configuration.Configuration; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -29,4 +31,6 @@ public interface ExchangeFactory Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, int ticket) throws AMQException; + + void initialise(Configuration hostConfig); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index d3a466565f..0003b8302f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.exchange; +import org.apache.commons.configuration.Configuration; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java new file mode 100644 index 0000000000..472de2da5c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java @@ -0,0 +1,34 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+
+public interface ExchangeType<T extends Exchange>
+{
+ public AMQShortString getName();
+ public Class<T> getExchangeClass();
+ public T newInstance(VirtualHost host, AMQShortString name,
+ boolean durable, boolean autoDelete) throws AMQException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index bf00eeb9d3..8895539538 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -49,8 +50,36 @@ import java.util.concurrent.CopyOnWriteArraySet; public class FanoutExchange extends AbstractExchange
{
+
+
private static final Logger _logger = Logger.getLogger(FanoutExchange.class);
+
+ public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
+ {
+
+ public AMQShortString getName()
+ {
+ return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ }
+
+ public Class<FanoutExchange> getExchangeClass()
+ {
+ return FanoutExchange.class;
+ }
+
+ public FanoutExchange newInstance(VirtualHost host,
+ AMQShortString name,
+ boolean durable,
+ boolean autoDelete) throws AMQException
+ {
+ FanoutExchange exch = new FanoutExchange();
+ exch.initialise(host,name,durable,autoDelete);
+ return exch;
+ }
+ };
+
+
/**
* Maps from queue name to queue instances
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index e86094e26f..bed08daeaf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -49,6 +49,7 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; /** * An exchange that binds queues based on a set of required headers and header values @@ -81,6 +82,33 @@ public class HeadersExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); + + + public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>() + { + + public AMQShortString getName() + { + return ExchangeDefaults.HEADERS_EXCHANGE_CLASS; + } + + public Class<HeadersExchange> getExchangeClass() + { + return HeadersExchange.class; + } + + public HeadersExchange newInstance(VirtualHost host, + AMQShortString name, + boolean durable, + boolean autoDelete) throws AMQException + { + HeadersExchange exch = new HeadersExchange(); + exch.initialise(host,name,durable,autoDelete); + return exch; + } + }; + + private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>(); /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java index 5d6d68b3c8..bb33341aef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java @@ -52,9 +52,6 @@ public interface ManagedExchange @MBeanAttribute(name="ExchangeType", description="Exchange Type") String getExchangeType() throws IOException; - @MBeanAttribute(name="TicketNo", description="Exchange Ticket No") - Integer getTicketNo() throws IOException; - /** * Tells if the exchange is durable or not. * @return true if the exchange is durable. diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 6638689299..6518d8b765 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -146,6 +146,7 @@ public class VirtualHost implements Accessable _queueRegistry = new DefaultQueueRegistry(this); _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeFactory.initialise(hostConfig); _exchangeRegistry = new DefaultExchangeRegistry(this); if (store != null) @@ -164,6 +165,7 @@ public class VirtualHost implements Accessable _exchangeRegistry.initialise(); + _logger.warn("VirtualHost authentication Managers require spec change to be operational."); _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java index 9653155a51..10450f880f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -48,7 +48,7 @@ public class ExchangeMBeanTest extends TestCase public void testDirectExchangeMBean() throws Exception { DestNameExchange exchange = new DestNameExchange(); - exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -62,7 +62,6 @@ public class ExchangeMBeanTest extends TestCase // test general exchange properties assertEquals(mbean.getName(), "amq.direct"); assertEquals(mbean.getExchangeType(), "direct"); - assertTrue(mbean.getTicketNo() == 0); assertTrue(!mbean.isDurable()); assertTrue(mbean.isAutoDelete()); } @@ -75,7 +74,7 @@ public class ExchangeMBeanTest extends TestCase public void testTopicExchangeMBean() throws Exception { DestWildExchange exchange = new DestWildExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); + exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -89,7 +88,6 @@ public class ExchangeMBeanTest extends TestCase // test general exchange properties assertEquals(mbean.getName(), "amq.topic"); assertEquals(mbean.getExchangeType(), "topic"); - assertTrue(mbean.getTicketNo() == 0); assertTrue(!mbean.isDurable()); assertTrue(mbean.isAutoDelete()); } @@ -102,7 +100,7 @@ public class ExchangeMBeanTest extends TestCase public void testHeadersExchangeMBean() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -116,7 +114,6 @@ public class ExchangeMBeanTest extends TestCase // test general exchange properties assertEquals(mbean.getName(), "amq.match"); assertEquals(mbean.getExchangeType(), "headers"); - assertTrue(mbean.getTicketNo() == 0); assertTrue(!mbean.isDurable()); assertTrue(mbean.isAutoDelete()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 0ee4882ec2..3691e80234 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -138,7 +138,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _channelId = channelId; _session = session; _producerId = producerId; - if (destination != null) + if (destination != null && !(destination instanceof AMQUndefinedDestination)) { declareDestination(destination); } @@ -150,7 +150,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j void resubscribe() throws AMQException { - if (_destination != null) + if (_destination != null && !(_destination instanceof AMQUndefinedDestination)) { declareDestination(_destination); } diff --git a/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java b/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java index 6b0394f1a6..35f692f23c 100644 --- a/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java +++ b/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java @@ -79,7 +79,7 @@ public class ManagementConsoleTest extends TestCase // If this test fails due to changes in the broker code, // then the constants in the Constants.java shoule be updated accordingly DestNameExchange exchange = new DestNameExchange(); - exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, true); AMQManagedObject mbean = (AMQManagedObject)exchange.getManagedObject(); MBeanInfo mbeanInfo = mbean.getMBeanInfo(); |