diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 09:26:47 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 09:26:47 +0000 |
commit | 417e8975c58bfa1280c9a180493ada61c1cb7097 (patch) | |
tree | 38bc26ca12c717dd68aa0086e50a900d590d8020 | |
parent | bc77d792e85c348163d24a609f05f3772a7ae950 (diff) | |
download | qpid-python-417e8975c58bfa1280c9a180493ada61c1cb7097.tar.gz |
QPID-538 Check to ensure a duplicate binding is not created.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556861 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 61 insertions, 25 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index a632720ebe..5edffc19ed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -187,19 +187,24 @@ public class DestNameExchange extends AbstractExchange } } - public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + return isBound(routingKey,queue); + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) { final List<AMQQueue> queues = _index.get(routingKey); return queues != null && queues.contains(queue); } - public boolean isBound(AMQShortString routingKey) throws AMQException + public boolean isBound(AMQShortString routingKey) { final List<AMQQueue> queues = _index.get(routingKey); return queues != null && !queues.isEmpty(); } - public boolean isBound(AMQQueue queue) throws AMQException + public boolean isBound(AMQQueue queue) { Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap(); for (List<AMQQueue> queues : bindings.values()) @@ -212,7 +217,7 @@ public class DestNameExchange extends AbstractExchange return false; } - public boolean hasBindings() throws AMQException + public boolean hasBindings() { return !_index.getBindingsMap().isEmpty(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index d88dcb11f7..b55dbcc792 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -230,21 +230,26 @@ public class DestWildExchange extends AbstractExchange } } - public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + return isBound(routingKey, queue); + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); return (queues != null) && queues.contains(queue); } - public boolean isBound(AMQShortString routingKey) throws AMQException + public boolean isBound(AMQShortString routingKey) { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); return (queues != null) && !queues.isEmpty(); } - public boolean isBound(AMQQueue queue) throws AMQException + public boolean isBound(AMQQueue queue) { for (List<AMQQueue> queues : _routingKey2queues.values()) { @@ -257,7 +262,7 @@ public class DestWildExchange extends AbstractExchange return false; } - public boolean hasBindings() throws AMQException + public boolean hasBindings() { return !_routingKey2queues.isEmpty(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 9fa7b56ba7..37cd85a8f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -27,8 +27,8 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.Map; import java.util.List; +import java.util.Map; public interface Exchange { @@ -55,6 +55,17 @@ public interface Exchange void route(AMQMessage message) throws AMQException; + + /** + * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments + * @param routingKey + * @param arguments + * @param queue + * @return + * @throws AMQException + */ + boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue); + /** * Determines whether a message would be isBound to a particular queue using a specific routing key * @param routingKey @@ -62,7 +73,7 @@ public interface Exchange * @return * @throws AMQException */ - boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException; + boolean isBound(AMQShortString routingKey, AMQQueue queue); /** * Determines whether a message is routing to any queue using a specific _routing key @@ -70,16 +81,17 @@ public interface Exchange * @return * @throws AMQException */ - boolean isBound(AMQShortString routingKey) throws AMQException; + boolean isBound(AMQShortString routingKey); - boolean isBound(AMQQueue queue) throws AMQException; + boolean isBound(AMQQueue queue); /** * Returns true if this exchange has at least one binding associated with it. * @return * @throws AMQException */ - boolean hasBindings() throws AMQException; + boolean hasBindings(); Map<AMQShortString, List<AMQQueue>> getBindings(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index a65b105809..aa13f1d8ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -180,24 +180,29 @@ public class FanoutExchange extends AbstractExchange }
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
return (_queues != null) && !_queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_queues.isEmpty();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index bcbca8becf..3544e9d1f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -241,17 +241,23 @@ public class HeadersExchange extends AbstractExchange } } - public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + //fixme isBound here should take the arguements in to consideration. + return isBound(routingKey, queue); + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) { return isBound(queue); } - public boolean isBound(AMQShortString routingKey) throws AMQException + public boolean isBound(AMQShortString routingKey) { return hasBindings(); } - public boolean isBound(AMQQueue queue) throws AMQException + public boolean isBound(AMQQueue queue) { for (Registration r : _bindings) { @@ -263,7 +269,7 @@ public class HeadersExchange extends AbstractExchange return false; } - public boolean hasBindings() throws AMQException + public boolean hasBindings() { return !_bindings.isEmpty(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 4dc67b1970..2b751ab692 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -36,7 +37,6 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.AMQChannel; public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { @@ -77,7 +77,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); } - + if (body.routingKey == null) { body.routingKey = queue.getName(); @@ -98,8 +98,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); } try - { - queue.bind(body.routingKey, body.arguments, exch); + { + if (!exch.isBound(body.routingKey, body.arguments, queue)) + { + queue.bind(body.routingKey, body.arguments, exch); + } } catch (AMQInvalidRoutingKeyException rke) { |