summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-07-17 09:26:47 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-07-17 09:26:47 +0000
commit417e8975c58bfa1280c9a180493ada61c1cb7097 (patch)
tree38bc26ca12c717dd68aa0086e50a900d590d8020
parentbc77d792e85c348163d24a609f05f3772a7ae950 (diff)
downloadqpid-python-417e8975c58bfa1280c9a180493ada61c1cb7097.tar.gz
QPID-538 Check to ensure a duplicate binding is not created.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556861 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java11
6 files changed, 61 insertions, 25 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index a632720ebe..5edffc19ed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -187,19 +187,24 @@ public class DestNameExchange extends AbstractExchange
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey,queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
final List<AMQQueue> queues = _index.get(routingKey);
return queues != null && queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
final List<AMQQueue> queues = _index.get(routingKey);
return queues != null && !queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap();
for (List<AMQQueue> queues : bindings.values())
@@ -212,7 +217,7 @@ public class DestNameExchange extends AbstractExchange
return false;
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_index.getBindingsMap().isEmpty();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index d88dcb11f7..b55dbcc792 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -230,21 +230,26 @@ public class DestWildExchange extends AbstractExchange
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return (queues != null) && queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return (queues != null) && !queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
for (List<AMQQueue> queues : _routingKey2queues.values())
{
@@ -257,7 +262,7 @@ public class DestWildExchange extends AbstractExchange
return false;
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_routingKey2queues.isEmpty();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 9fa7b56ba7..37cd85a8f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -27,8 +27,8 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Map;
import java.util.List;
+import java.util.Map;
public interface Exchange
{
@@ -55,6 +55,17 @@ public interface Exchange
void route(AMQMessage message) throws AMQException;
+
+ /**
+ * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
+ * @param routingKey
+ * @param arguments
+ * @param queue
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue);
+
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key
* @param routingKey
@@ -62,7 +73,7 @@ public interface Exchange
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+ boolean isBound(AMQShortString routingKey, AMQQueue queue);
/**
* Determines whether a message is routing to any queue using a specific _routing key
@@ -70,16 +81,17 @@ public interface Exchange
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey) throws AMQException;
+ boolean isBound(AMQShortString routingKey);
- boolean isBound(AMQQueue queue) throws AMQException;
+ boolean isBound(AMQQueue queue);
/**
* Returns true if this exchange has at least one binding associated with it.
* @return
* @throws AMQException
*/
- boolean hasBindings() throws AMQException;
+ boolean hasBindings();
Map<AMQShortString, List<AMQQueue>> getBindings();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index a65b105809..aa13f1d8ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -180,24 +180,29 @@ public class FanoutExchange extends AbstractExchange
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
return (_queues != null) && !_queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_queues.isEmpty();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index bcbca8becf..3544e9d1f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -241,17 +241,23 @@ public class HeadersExchange extends AbstractExchange
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ //fixme isBound here should take the arguements in to consideration.
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return isBound(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
return hasBindings();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
for (Registration r : _bindings)
{
@@ -263,7 +269,7 @@ public class HeadersExchange extends AbstractExchange
return false;
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_bindings.isEmpty();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 4dc67b1970..2b751ab692 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,7 +37,6 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -77,7 +77,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
-
+
if (body.routingKey == null)
{
body.routingKey = queue.getName();
@@ -98,8 +98,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
}
try
- {
- queue.bind(body.routingKey, body.arguments, exch);
+ {
+ if (!exch.isBound(body.routingKey, body.arguments, queue))
+ {
+ queue.bind(body.routingKey, body.arguments, exch);
+ }
}
catch (AMQInvalidRoutingKeyException rke)
{