diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java | 26 |
1 files changed, 8 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 68ad88c4cb..1ee1f35de6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -31,7 +31,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Collection; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -240,7 +241,7 @@ public class HeadersExchange extends AbstractExchange } } - public void route(AMQMessage payload) throws AMQException + public void route(IncomingMessage payload) throws AMQException { FieldTable headers = getHeaders(payload.getContentHeaderBody()); if (_logger.isDebugEnabled()) @@ -248,8 +249,10 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } boolean routed = false; + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); for (Registration e : _bindings) { + if (e.binding.matches(headers)) { if (_logger.isDebugEnabled()) @@ -257,25 +260,12 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": delivering message with headers " + headers + " to " + e.queue.getName()); } - payload.enqueue(e.queue); - routed = true; - } - } - if (!routed) - { - - String msg = "Exchange " + getName() + ": message not routable."; + queues.add(e.queue); - if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate()) - { - throw new NoRouteException(msg, payload); - } - else - { - _logger.warn(msg); + routed = true; } - } + payload.enqueue(queues); } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) |