diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-25 16:24:00 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-25 16:24:00 +0000 |
commit | 3b38933380a6583004a8b19511a6e056114dd811 (patch) | |
tree | ccde7fd3825fcf6375d65e15718d5fd7e2300fa6 | |
parent | 8388c3451fc06cc5c752381155ff7b898f135268 (diff) | |
download | qpid-python-3b38933380a6583004a8b19511a6e056114dd811.tar.gz |
Revision: 494658
Author: rgreig
Date: 00:11:27, 10 January 2007
Message:
QPID-271 : (Patch supplied by Rob Godfrey) Implement fanout exchange
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
Revision: 494655
Author: rgreig
Date: 23:46:48, 09 January 2007
Message:
QPID-270 : (Patch supplied by Rob Godfrey) Change of use of AMQConnectionException
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
Revision: 494652
Author: rgreig
Date: 23:36:50, 09 January 2007
Message:
QPID-269 : (Patch supplied by Rob Godfrey) Add getType() to Exchange
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
494650 - PARTIAL
Revision: 494650
Author: rgreig
Date: 23:22:52, 09 January 2007
Message:
QPID-268 : (Patch supplied by Rob Godfrey) Improvements to performance of generated code
----
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@499828 13f79535-47bb-0310-9956-ffa450edef68
13 files changed, 306 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 0c73e0f9f0..06d28abef1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -22,6 +22,10 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.exchange.ExchangeDefaults; import java.util.HashMap; import java.util.Map; @@ -34,9 +38,10 @@ public class DefaultExchangeFactory implements ExchangeFactory public DefaultExchangeFactory() { - _exchangeClassMap.put("direct", org.apache.qpid.server.exchange.DestNameExchange.class); - _exchangeClassMap.put("topic", org.apache.qpid.server.exchange.DestWildExchange.class); - _exchangeClassMap.put("headers", org.apache.qpid.server.exchange.HeadersExchange.class); + _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); + _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); + _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); + _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class); } public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete, @@ -46,7 +51,7 @@ public class DefaultExchangeFactory implements ExchangeFactory Class<? extends Exchange> exchClass = _exchangeClassMap.get(type); if (exchClass == null) { - throw new AMQException(_logger, "Unknown exchange type: " + type); + throw new AMQUnknownExchangeType("Unknown exchange type: " + type); } try { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index d4069fa315..3ebf05ab09 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.management.MBeanConstructor; @@ -140,6 +141,11 @@ public class DestNameExchange extends AbstractExchange } } + public String getType() + { + return ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + } + public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 139307488e..4039c3a29f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.management.MBeanConstructor; @@ -125,6 +126,10 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class + public String getType() + { + return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 824e85dc5c..9fcbe2a871 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.queue.AMQMessage; public interface Exchange { String getName(); + String getType(); void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java new file mode 100644 index 0000000000..150bcb3582 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -0,0 +1,201 @@ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; + +import javax.management.openmbean.*; +import javax.management.JMException; +import javax.management.MBeanException; +import java.util.concurrent.CopyOnWriteArraySet; + +public class FanoutExchange extends AbstractExchange +{ + private static final Logger _logger = Logger.getLogger(FanoutExchange.class); + + /** + * Maps from queue name to queue instances + */ + private final CopyOnWriteArraySet<AMQQueue> _queues = new CopyOnWriteArraySet<AMQQueue>(); + + /** + * MBean class implementing the management interfaces. + */ + @MBeanDescription("Management Bean for Fanout Exchange") + private final class FanoutExchangeMBean extends ExchangeMBean + { + // open mbean data types for representing exchange bindings + private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; + private OpenType[] _bindingItemTypes = new OpenType[2]; + private CompositeType _bindingDataType = null; + private TabularType _bindinglistDataType = null; + private TabularDataSupport _bindingList = null; + + @MBeanConstructor("Creates an MBean for AMQ fanout exchange") + public FanoutExchangeMBean() throws JMException + { + super(); + _exchangeType = "fanout"; + init(); + } + + /** + * initialises the OpenType objects. + */ + private void init() throws OpenDataException + { + _bindingItemTypes[0] = SimpleType.STRING; + _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); + _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names", + _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), + _bindingDataType, _bindingItemIndexNames); + } + + public TabularData bindings() throws OpenDataException + { + + _bindingList = new TabularDataSupport(_bindinglistDataType); + + for (AMQQueue queue : _queues) + { + String queueName = queue.getName().toString(); + + + + Object[] bindingItemValues = {queueName, new String[] {queueName}}; + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + _bindingList.put(bindingData); + } + + return _bindingList; + } + + public void createNewBinding(String queueName, String binding) throws JMException + { + AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); + if (queue == null) + { + throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } + + try + { + registerQueue(binding, queue, null); + queue.bind(binding, FanoutExchange.this); + } + catch (AMQException ex) + { + throw new MBeanException(ex); + } + } + + }// End of MBean class + + + protected ExchangeMBean createMBean() throws AMQException + { + try + { + return new FanoutExchange.FanoutExchangeMBean(); + } + catch (JMException ex) + { + _logger.error("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException("Exception occured in creating the direct exchange mbean", ex); + } + } + + public String getType() + { + return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + } + + public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + + if (_queues.contains(queue)) + { + _logger.debug("Queue " + queue + " is already registered"); + } + else + { + _queues.add(queue); + _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); + } + } + + public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + { + assert queue != null; + assert routingKey != null; + + if (!_queues.remove(queue)) + { + throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + + ". "); + } + } + + public void route(AMQMessage payload) throws AMQException + { + final BasicPublishBody publishBody = payload.getPublishBody(); + final String routingKey = publishBody.routingKey; + if (_queues == null || _queues.isEmpty()) + { + String msg = "No queues bound to " + this; + if (publishBody.mandatory) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Publishing message to queue " + _queues); + } + + for (AMQQueue q : _queues) + { + payload.enqueue(q); + } + } + } + + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + return _queues.contains(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + + return _queues != null && !_queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + + + return _queues.contains(queue); + } + + public boolean hasBindings() throws AMQException + { + return !_queues.isEmpty(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 229502d2a6..e4696be932 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -173,6 +174,11 @@ public class HeadersExchange extends AbstractExchange } // End of MBean class + public String getType() + { + return ExchangeDefaults.HEADERS_EXCHANGE_CLASS; + } + public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException { _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() + " with " + args); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index d3ec70456f..e13e4fec37 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -31,6 +31,7 @@ public class ExchangeInitialiser define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); } private void define(ExchangeRegistry r, ExchangeFactory f, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 034cf3a1eb..7903ef4d10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -21,7 +21,7 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQUnresolvedAddressException; @@ -274,8 +274,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect message = "Unable to Connect"; } - AMQException e = new AMQConnectionException(message); - + AMQException e = new AMQConnectionFailureException(message); + if (lastException != null) { if (lastException instanceof UnresolvedAddressException) diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index c5aa62032c..b3dd4aca64 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -24,8 +24,8 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQConnectionFailureException; import javax.jms.Connection; @@ -88,7 +88,7 @@ public class ConnectionTest extends TestCase } catch (AMQException amqe) { - if (!(amqe instanceof AMQConnectionException)) + if (!(amqe instanceof AMQConnectionFailureException)) { fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 6254d80f32..6f47361277 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -21,10 +21,38 @@ package org.apache.qpid; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ConnectionCloseBody; + public class AMQConnectionException extends AMQException { - public AMQConnectionException(String message) + private final int _classId; + private final int _methodId; + /* AMQP version for which exception ocurred */ + private final byte major; + private final byte minor; + + public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) + { + super(errorCode, msg, t); + _classId = classId; + _methodId = methodId; + this.major = major; + this.minor = minor; + } + + public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor) { - super(message); + super(errorCode, msg); + _classId = classId; + _methodId = methodId; + this.major = major; + this.minor = minor; } + + public AMQFrame getCloseFrame(int channel) + { + return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), getMessage()); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java new file mode 100644 index 0000000000..a09f73dc42 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +public class AMQConnectionFailureException extends AMQException +{ + public AMQConnectionFailureException(String message) + { + super(message); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java new file mode 100644 index 0000000000..70f333a580 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java @@ -0,0 +1,9 @@ +package org.apache.qpid; + +public class AMQUnknownExchangeType extends AMQException +{ + public AMQUnknownExchangeType(String message) + { + super(message); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index e67a5ba7fe..dca8075f7f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -33,4 +33,8 @@ public class ExchangeDefaults public final static String HEADERS_EXCHANGE_NAME = "amq.match"; public final static String HEADERS_EXCHANGE_CLASS = "headers"; + + public final static String FANOUT_EXCHANGE_NAME = "amq.fanout"; + + public final static String FANOUT_EXCHANGE_CLASS = "fanout"; } |