diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-10 00:11:27 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-10 00:11:27 +0000 |
commit | 5056e239f4fd0720a99610278c97c12f1d8b56dc (patch) | |
tree | 9c3411a346d5589f1fb649945e56c519007ce89f | |
parent | 283741d2680b4fa43e3c0e4e02621d1a7a24e37c (diff) | |
download | qpid-python-5056e239f4fd0720a99610278c97c12f1d8b56dc.tar.gz |
QPID-271 : (Patch supplied by Rob Godfrey) Implement fanout exchange
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494658 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 218 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 222cd2aef2..77f9819048 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -39,6 +42,8 @@ public class DefaultExchangeFactory implements ExchangeFactory _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); + _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class); + } public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, @@ -48,7 +53,8 @@ public class DefaultExchangeFactory implements ExchangeFactory Class<? extends Exchange> exchClass = _exchangeClassMap.get(type); if (exchClass == null) { - throw new AMQException(_logger, "Unknown exchange type: " + type); + + throw new AMQUnknownExchangeType("Unknown exchange type: " + type); } try { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java new file mode 100644 index 0000000000..2e7457e4a6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -0,0 +1,206 @@ +package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class FanoutExchange extends AbstractExchange
+{
+ private static final Logger _logger = Logger.getLogger(FanoutExchange.class);
+
+ /**
+ * Maps from queue name to queue instances
+ */
+ private final CopyOnWriteArraySet<AMQQueue> _queues = new CopyOnWriteArraySet<AMQQueue>();
+
+ /**
+ * MBean class implementing the management interfaces.
+ */
+ @MBeanDescription("Management Bean for Fanout Exchange")
+ private final class FanoutExchangeMBean extends ExchangeMBean
+ {
+ // open mbean data types for representing exchange bindings
+ private String[] _bindingItemNames = {"Routing Key", "Queue Names"};
+ private String[] _bindingItemIndexNames = {_bindingItemNames[0]};
+ private OpenType[] _bindingItemTypes = new OpenType[2];
+ private CompositeType _bindingDataType = null;
+ private TabularType _bindinglistDataType = null;
+ private TabularDataSupport _bindingList = null;
+
+ @MBeanConstructor("Creates an MBean for AMQ direct exchange")
+ public FanoutExchangeMBean() throws JMException
+ {
+ super();
+ _exchangeType = "fanout";
+ init();
+ }
+
+ /**
+ * initialises the OpenType objects.
+ */
+ private void init() throws OpenDataException
+ {
+ _bindingItemTypes[0] = SimpleType.STRING;
+ _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+ _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names",
+ _bindingItemNames, _bindingItemNames, _bindingItemTypes);
+ _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(),
+ _bindingDataType, _bindingItemIndexNames);
+ }
+
+ public TabularData bindings() throws OpenDataException
+ {
+
+ _bindingList = new TabularDataSupport(_bindinglistDataType);
+
+ for (AMQQueue queue : _queues)
+ {
+ String queueName = queue.getName().toString();
+
+
+
+ Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
+ _bindingList.put(bindingData);
+ }
+
+ return _bindingList;
+ }
+
+ public void createNewBinding(String queueName, String binding) throws JMException
+ {
+ AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ if (queue == null)
+ {
+ throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+ }
+
+ try
+ {
+ registerQueue(new AMQShortString(binding), queue, null);
+ queue.bind(new AMQShortString(binding), FanoutExchange.this);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex);
+ }
+ }
+
+ }// End of MBean class
+
+
+ protected ExchangeMBean createMBean() throws AMQException
+ {
+ try
+ {
+ return new FanoutExchange.FanoutExchangeMBean();
+ }
+ catch (JMException ex)
+ {
+ _logger.error("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+ }
+ }
+
+ public AMQShortString getType()
+ {
+ return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ }
+
+ public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ assert queue != null;
+
+ if (_queues.contains(queue))
+ {
+ _logger.debug("Queue " + queue + " is already registered");
+ }
+ else
+ {
+ _queues.add(queue);
+ _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this);
+ }
+ }
+
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ {
+ assert queue != null;
+ assert routingKey != null;
+
+ if (!_queues.remove(queue))
+ {
+ throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+ ". ");
+ }
+ }
+
+ public void route(AMQMessage payload) throws AMQException
+ {
+ final BasicPublishBody publishBody = payload.getPublishBody();
+ final AMQShortString routingKey = publishBody.routingKey;
+ if (_queues == null || _queues.isEmpty())
+ {
+ String msg = "No queues bound to " + this;
+ if (publishBody.mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else
+ {
+ _logger.warn(msg);
+ }
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Publishing message to queue " + _queues);
+ }
+
+ for (AMQQueue q : _queues)
+ {
+ payload.enqueue(q);
+ }
+ }
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ {
+ return _queues.contains(queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey) throws AMQException
+ {
+
+ return _queues != null && !_queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+
+
+ return _queues.contains(queue);
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_queues.isEmpty();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index d4881aefaf..e1fac55d3b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -32,6 +32,7 @@ public class ExchangeInitialiser define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); } private void define(ExchangeRegistry r, ExchangeFactory f, diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 729cdb871e..57159f3802 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -35,4 +35,8 @@ public class ExchangeDefaults public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match"); public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers"); + + public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout"); + + public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); } |