diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
commit | a74a60f8fe5d15deb665cae72af8bae110e195d9 (patch) | |
tree | e0b5a10c8c765bc52d5abc4c1674dc9da61cdffe /java | |
parent | 1a88e5743df4f7eb18fe06cfbe56e797d6bca6ea (diff) | |
download | qpid-python-a74a60f8fe5d15deb665cae72af8bae110e195d9.tar.gz |
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
26 files changed, 1501 insertions, 1661 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 0d05307cb4..58c2b33041 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -173,20 +174,106 @@ public abstract class AbstractExchange implements Exchange return getVirtualHost().getQueueRegistry(); } - public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue) + public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue); } + public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return (b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments); + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return isBound(routingKey==null ? "" : routingKey.asString(), queue); + } + + public final boolean isBound(String bindingKey, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey) + { + return isBound(routingKey == null ? "" : routingKey.asString()); + } + + public final boolean isBound(String bindingKey) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey())) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQQueue queue) + { + for(Binding b : _bindings) + { + if(queue == b.getQueue()) + { + return true; + } + } + return false; + } - public boolean isBound(String bindingKey, AMQQueue queue) + @Override + public final boolean isBound(Map<String, Object> arguments, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + for(Binding b : _bindings) + { + if(queue == b.getQueue() && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; + } + + @Override + public final boolean isBound(String bindingKey, Map<String, Object> arguments) + { + for(Binding b : _bindings) + { + if(b.getBindingKey().equals(bindingKey) && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; } - public boolean isBound(String bindingKey) + public final boolean hasBindings() { - return isBound(new AMQShortString(bindingKey)); + return !_bindings.isEmpty(); } public Exchange getAlternateExchange() diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 4e136965a1..ccf955ed1c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -272,6 +272,18 @@ public class DefaultExchange implements Exchange } @Override + public boolean isBound(Map<String, Object> arguments, AMQQueue queue) + { + return (arguments == null || arguments.isEmpty()) && isBound(queue); + } + + @Override + public boolean isBound(String bindingKey, Map<String, Object> arguments) + { + return (arguments == null || arguments.isEmpty()) && isBound(bindingKey); + } + + @Override public boolean isBound(String bindingKey) { return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index fc6ce15bc4..2e2a93d638 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -20,9 +20,18 @@ */ package org.apache.qpid.server.exchange; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteArraySet; public class DirectExchange extends AbstractExchange { + + private static final Logger _logger = Logger.getLogger(DirectExchange.class); + private static final class BindingSet { private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>(); - private List<BaseQueue> _queues = new ArrayList<BaseQueue>(); + private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>(); + private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>(); public synchronized void addBinding(Binding binding) { @@ -56,27 +69,59 @@ public class DirectExchange extends AbstractExchange private void recalculateQueues() { List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size()); + Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>(); for(Binding b : _bindings) { - if(!queues.contains(b.getQueue())) + + if(FilterSupport.argumentsContainFilter(b.getArguments())) { - queues.add(b.getQueue()); + try + { + MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue()); + filteredQueues.put(b.getQueue(),filter); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName() + + "' to exchange '" + b.getExchange().getName() + + "' with arguments: " + b.getArguments(), e); + } + + } + else + { + + if(!queues.contains(b.getQueue())) + { + queues.add(b.getQueue()); + } } } - _queues = queues; + _unfilteredQueues = queues; + _filteredQueues = filteredQueues; } - public List<BaseQueue> getQueues() + public List<BaseQueue> getUnfilteredQueues() { - return _queues; + return _unfilteredQueues; } public CopyOnWriteArraySet<Binding> getBindings() { return _bindings; } + + public boolean hasFilteredQueues() + { + return !_filteredQueues.isEmpty(); + } + + public Map<BaseQueue,MessageFilter> getFilteredQueues() + { + return _filteredQueues; + } } private final ConcurrentHashMap<String, BindingSet> _bindingsByKey = @@ -98,7 +143,30 @@ public class DirectExchange extends AbstractExchange if(bindings != null) { - return bindings.getQueues(); + List<BaseQueue> queues = bindings.getUnfilteredQueues(); + + if(bindings.hasFilteredQueues()) + { + Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues); + + Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues(); + for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet()) + { + if(!queuesSet.contains(entry.getKey())) + { + MessageFilter filter = entry.getValue(); + if(filter.matches(payload)) + { + queuesSet.add(entry.getKey()); + } + } + } + if(queues.size() != queuesSet.size()) + { + queues = new ArrayList<BaseQueue>(queuesSet); + } + } + return queues; } else { @@ -106,50 +174,6 @@ public class DirectExchange extends AbstractExchange } - - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey,queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - if(bindings != null) - { - return bindings.getQueues().contains(queue); - } - return false; - - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.getQueues().isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - - for (BindingSet bindings : _bindingsByKey.values()) - { - if(bindings.getQueues().contains(queue)) - { - return true; - } - - } - return false; - } - - public boolean hasBindings() - { - return !getBindings().isEmpty(); } protected void onBind(final Binding binding) @@ -189,5 +213,4 @@ public class DirectExchange extends AbstractExchange } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index a5a1d7f912..d483c3b29b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -145,12 +145,15 @@ public interface Exchange extends ExchangeReferrer Collection<Binding> getBindings(); + boolean isBound(String bindingKey); boolean isBound(String bindingKey, AMQQueue queue); - public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); + boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); - boolean isBound(String bindingKey); + boolean isBound(Map<String, Object> arguments, AMQQueue queue); + + boolean isBound(String bindingKey, Map<String, Object> arguments); void removeReference(ExchangeReferrer exchange); @@ -158,6 +161,8 @@ public interface Exchange extends ExchangeReferrer boolean hasReferrers(); + + public interface BindingListener { void bindingAdded(Exchange exchange, Binding binding); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6ad5eb261e..cd830d69a9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -20,11 +20,21 @@ */ package org.apache.qpid.server.exchange; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -42,7 +52,18 @@ public class FanoutExchange extends AbstractExchange /** * Maps from queue name to queue instances */ - private final ConcurrentHashMap<AMQQueue,Integer> _queues = new ConcurrentHashMap<AMQQueue,Integer>(); + private final Map<AMQQueue,Integer> _queues = new HashMap<AMQQueue,Integer>(); + private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>(); + private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>(); + + private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>> _filteredBindings = + new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>(); + { + Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap(); + _filteredBindings.set(emptyMap); + } + + public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType(); @@ -54,115 +75,150 @@ public class FanoutExchange extends AbstractExchange public ArrayList<BaseQueue> doRoute(InboundMessage payload) { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Publishing message to queue " + _queues); - } - for(Binding b : getBindings()) { b.incrementMatches(); } - return new ArrayList<BaseQueue>(_queues.keySet()); - - } + final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues); - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey, queue); - } - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(queue); - } + final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get(); + if(!_filteredQueues.isEmpty()) + { + for(AMQQueue q : _filteredQueues) + { + final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q); + if(!(bindingMessageFilterMap == null || result.contains(q))) + { + for(MessageFilter filter : bindingMessageFilterMap.values()) + { + if(filter.matches(payload)) + { + result.add(q); + break; + } + } + } + } - public boolean isBound(AMQShortString routingKey) - { + } - return (_queues != null) && !_queues.isEmpty(); - } - public boolean isBound(AMQQueue queue) - { - if (queue == null) + if (_logger.isDebugEnabled()) { - return false; + _logger.debug("Publishing message to queue " + result); } - return _queues.containsKey(queue); - } - public boolean hasBindings() - { - return !_queues.isEmpty(); + return result; + } - protected void onBind(final Binding binding) + + protected synchronized void onBind(final Binding binding) { AMQQueue queue = binding.getQueue(); assert queue != null; + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) + { - Integer oldVal; + Integer oldVal; + if(_queues.containsKey(queue)) + { + _queues.put(queue,_queues.get(queue)+1); + } + else + { + _queues.put(queue, ONE); + _unfilteredQueues.add(queue); + // No longer any reason to check filters for this queue + _filteredQueues.remove(queue); + } - if((oldVal = _queues.putIfAbsent(queue, ONE)) != null) + } + else { - Integer newVal = oldVal+1; - while(!_queues.replace(queue, oldVal, newVal)) + try { - oldVal = _queues.get(queue); - if(oldVal == null) + + HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings = + new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get()); + + Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue()); + final + MessageFilter messageFilter = + FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue()); + + if(bindingsForQueue != null) { - oldVal = _queues.putIfAbsent(queue, ONE); - if(oldVal == null) + bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue); + bindingsForQueue.put(binding, messageFilter); + } + else + { + bindingsForQueue = Collections.singletonMap(binding, messageFilter); + if(!_unfilteredQueues.contains(queue)) { - break; + _filteredQueues.add(queue); } } - newVal = oldVal + 1; + + filteredBindings.put(binding.getQueue(), bindingsForQueue); + + _filteredBindings.set(filteredBindings); + + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e); + return; } } - if (_logger.isDebugEnabled()) { _logger.debug("Binding queue " + queue - + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this); + + " with routing key " + binding.getBindingKey() + " to exchange " + this); } } - protected void onUnbind(final Binding binding) + protected synchronized void onUnbind(final Binding binding) { AMQQueue queue = binding.getQueue(); - Integer oldValue = _queues.get(queue); - - boolean done = false; - - while(!(done || oldValue == null)) + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) { - while(!(done || oldValue == null) && oldValue.intValue() == 1) + Integer oldValue = _queues.remove(queue); + if(ONE.equals(oldValue)) { - if(!_queues.remove(queue, oldValue)) + // should start checking filters for this queue + if(_filteredBindings.get().containsKey(queue)) { - oldValue = _queues.get(queue); - } - else - { - done = true; + _filteredQueues.add(queue); } + _unfilteredQueues.remove(queue); } - while(!(done || oldValue == null) && oldValue.intValue() != 1) + else { - Integer newValue = oldValue - 1; - if(!_queues.replace(queue, oldValue, newValue)) - { - oldValue = _queues.get(queue); - } - else - { - done = true; - } + _queues.put(queue,oldValue-1); } } + else // we are removing a binding with filters + { + HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings = + new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get()); + + Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue()); + if(bindingsForQueue.size()>1) + { + bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue); + bindingsForQueue.remove(binding); + filteredBindings.put(binding.getQueue(),bindingsForQueue); + } + else + { + _filteredQueues.remove(queue); + } + _filteredBindings.set(filteredBindings); + + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java new file mode 100644 index 0000000000..880c9a2cf6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.exchange; + +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.Filterable; + +public class FilterSupport +{ + private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = + Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>()); + + static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException + { + final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException + { + final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException + { + WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); + JMSSelectorFilter selector = null; + + if(selectorRef == null || (selector = selectorRef.get())==null) + { + try + { + selector = new JMSSelectorFilter(selectorString); + } + catch (ParseException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (SelectorParsingException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (TokenMgrError e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); + } + return selector; + } + + static boolean argumentsContainFilter(final FieldTable args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainFilter(final Map<String, Object> args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainNoLocal(final Map<String, Object> args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString())); + } + + + static boolean argumentsContainNoLocal(final FieldTable args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); + } + + + static boolean argumentsContainJMSSelector(final Map<String,Object> args) + { + return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String) + && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; + } + + + static boolean argumentsContainJMSSelector(final FieldTable args) + { + return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) + && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); + } + + + static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static final class NoLocalFilter implements MessageFilter + { + private final AMQQueue _queue; + + public NoLocalFilter(AMQQueue queue) + { + _queue = queue; + } + + public boolean matches(Filterable message) + { + InboundMessage inbound = (InboundMessage) message; + final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); + return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); + + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } + + NoLocalFilter that = (NoLocalFilter) o; + + return _queue == null ? that._queue == null : _queue.equals(that._queue); + } + + @Override + public int hashCode() + { + return _queue != null ? _queue.hashCode() : 0; + } + } + + static final class CompoundFilter implements MessageFilter + { + private MessageFilter _noLocalFilter; + private MessageFilter _jmsSelectorFilter; + + public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) + { + _noLocalFilter = filter; + _jmsSelectorFilter = jmsSelectorFilter; + } + + public boolean matches(Filterable message) + { + return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + CompoundFilter that = (CompoundFilter) o; + + if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) + { + return false; + } + if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; + result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); + return result; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index b6f5f973f4..eb4a84a5b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.AMQMessageHeader; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.Filterable; /** * Defines binding and matching based on a set of headers. @@ -44,13 +48,14 @@ class HeadersBinding private final Set<String> required = new HashSet<String>(); private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; + private MessageFilter _filter; /** * Creates a header binding for a set of mappings. Those mappings whose value is * null or the empty string are assumed only to be required headers, with * no constraint on the value. Those with a non-null value are assumed to * define a required match of value. - * + * * @param binding the binding to create a header binding using */ public HeadersBinding(Binding binding) @@ -66,9 +71,30 @@ class HeadersBinding _mappings = null; } } - + private void initMappings() { + if(FilterSupport.argumentsContainFilter(_mappings)) + { + try + { + _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue()); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName() + +"' to exchange '"+_binding.getExchange().getName() + +"' with arguments: " + _binding.getArguments()); + _filter = new MessageFilter() + { + @Override + public boolean matches(Filterable message) + { + return false; + } + }; + } + } for(Map.Entry<String, Object> entry : _mappings.entrySet()) { String propertyName = entry.getKey(); @@ -87,7 +113,7 @@ class HeadersBinding } } } - + public Binding getBinding() { return _binding; @@ -111,6 +137,11 @@ class HeadersBinding } } + public boolean matches(InboundMessage message) + { + return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message)); + } + private boolean and(AMQMessageHeader headers) { if(headers.containsHeaders(required)) @@ -215,7 +246,7 @@ class HeadersBinding { return key.startsWith("X-") || key.startsWith("x-"); } - + @Override public boolean equals(final Object o) { @@ -250,4 +281,4 @@ class HeadersBinding return true; } -}
\ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 746c8ac6bc..9fb745d553 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -69,14 +69,14 @@ public class HeadersExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); - + private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey = new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>(); - + private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers = new CopyOnWriteArrayList<HeadersBinding>(); - + public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType(); public HeadersExchange() @@ -87,112 +87,31 @@ public class HeadersExchange extends AbstractExchange public ArrayList<BaseQueue> doRoute(InboundMessage payload) { - AMQMessageHeader header = payload.getMessageHeader(); if (_logger.isDebugEnabled()) { - _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header); + _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader()); } - + LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>(); - + for (HeadersBinding hb : _bindingHeaderMatchers) { - if (hb.matches(header)) + if (hb.matches(payload)) { Binding b = hb.getBinding(); - + b.incrementMatches(); - + if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " + - header + " to " + b.getQueue().getNameShortString()); + payload.getMessageHeader() + " to " + b.getQueue().getNameShortString()); } queues.add(b.getQueue()); } } - - return new ArrayList<BaseQueue>(queues); - } - - - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - CopyOnWriteArraySet<Binding> bindings; - if(bindingKey == null) - { - bindings = new CopyOnWriteArraySet<Binding>(getBindings()); - } - else - { - bindings = _bindingsByKey.get(bindingKey); - } - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(queue == null || binding.getQueue().equals(queue)) - { - return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments); - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - //fixme isBound here should take the arguements in to consideration. - return isBound(routingKey, queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values()) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - public boolean hasBindings() - { - return !getBindings().isEmpty(); + return new ArrayList<BaseQueue>(queues); } protected void onBind(final Binding binding) @@ -216,7 +135,7 @@ public class HeadersExchange extends AbstractExchange bindings = newBindings; } } - + if(_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 6d548be508..9d41856dc0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -20,21 +20,15 @@ */ package org.apache.qpid.server.exchange; -import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.filter.SelectorParsingException; -import org.apache.qpid.filter.selector.ParseException; -import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; @@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; -import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.Filterable; public class TopicExchange extends AbstractExchange { @@ -65,8 +55,6 @@ public class TopicExchange extends AbstractExchange private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); - private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>(); - public TopicExchange() { super(TYPE); @@ -77,7 +65,7 @@ public class TopicExchange extends AbstractExchange AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ; AMQQueue queue = binding.getQueue(); FieldTable args = FieldTable.convertToFieldTable(binding.getArguments()); - + assert queue != null; assert rKey != null; @@ -91,26 +79,26 @@ public class TopicExchange extends AbstractExchange FieldTable oldArgs = _bindings.get(binding); TopicExchangeResult result = _topicExchangeResults.get(routingKey); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.replaceQueueFilter(queue, - createMessageFilter(oldArgs, queue), - createMessageFilter(args, queue)); + FilterSupport.createMessageFilter(oldArgs, queue), + FilterSupport.createMessageFilter(args, queue)); } else { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); result.removeUnfilteredQueue(queue); } } else { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.addUnfilteredQueue(queue); - result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue)); + result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue)); } else { @@ -118,7 +106,7 @@ public class TopicExchange extends AbstractExchange return; } } - + result.addBinding(binding); } @@ -129,9 +117,9 @@ public class TopicExchange extends AbstractExchange if(result == null) { result = new TopicExchangeResult(); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { @@ -142,89 +130,22 @@ public class TopicExchange extends AbstractExchange } else { - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { result.addUnfilteredQueue(queue); } } - + result.addBinding(binding); _bindings.put(binding, args); } } - private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException - { - if(argumentsContainNoLocal(args)) - { - MessageFilter filter = new NoLocalFilter(queue); - - if(argumentsContainJMSSelector(args)) - { - filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); - } - return filter; - } - else - { - return createJMSSelectorFilter(args); - } - - } - - - private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException - { - final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); - WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); - JMSSelectorFilter selector = null; - - if(selectorRef == null || (selector = selectorRef.get())==null) - { - try - { - selector = new JMSSelectorFilter(selectorString); - } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (TokenMgrError e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); - } - return selector; - } - - private static boolean argumentsContainFilter(final FieldTable args) - { - return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); - } - - private static boolean argumentsContainNoLocal(final FieldTable args) - { - return args != null - && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) - && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); - } - - private static boolean argumentsContainJMSSelector(final FieldTable args) - { - return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) - && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); - } - public ArrayList<BaseQueue> doRoute(InboundMessage payload) { @@ -256,87 +177,6 @@ public class TopicExchange extends AbstractExchange } - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments)); - - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return o.equals(arguments); - } - else - { - return false; - } - - } - } - - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - Binding binding = new Binding(null, bindingKey, queue, this, arguments); - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return arguments.equals(FieldTable.convertToMap(o)); - } - else - { - return false; - } - } - - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(routingKey, null, queue); - } - - public boolean isBound(AMQShortString routingKey) - { - for(Binding b : _bindings.keySet()) - { - if(b.getBindingKey().equals(routingKey.toString())) - { - return true; - } - } - - return false; - } - - public boolean isBound(AMQQueue queue) - { - for(Binding b : _bindings.keySet()) - { - if(b.getQueue().equals(queue)) - { - return true; - } - } - - return false; - } - - public boolean hasBindings() - { - return !_bindings.isEmpty(); - } - private boolean deregisterQueue(final Binding binding) { if(_bindings.containsKey(binding)) @@ -344,14 +184,15 @@ public class TopicExchange extends AbstractExchange FieldTable bindingArgs = _bindings.remove(binding); AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); TopicExchangeResult result = _topicExchangeResults.get(bindingKey); - + result.removeBinding(binding); - - if(argumentsContainFilter(bindingArgs)) + + if(FilterSupport.argumentsContainFilter(bindingArgs)) { try { - result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue())); + result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs, + binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -418,96 +259,4 @@ public class TopicExchange extends AbstractExchange deregisterQueue(binding); } - private static final class NoLocalFilter implements MessageFilter - { - private final AMQQueue _queue; - - public NoLocalFilter(AMQQueue queue) - { - _queue = queue; - } - - public boolean matches(Filterable message) - { - InboundMessage inbound = (InboundMessage) message; - final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); - - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - - if (o == null || getClass() != o.getClass()) - { - return false; - } - - NoLocalFilter that = (NoLocalFilter) o; - - return _queue == null ? that._queue == null : _queue.equals(that._queue); - } - - @Override - public int hashCode() - { - return _queue != null ? _queue.hashCode() : 0; - } - } - - private static final class CompoundFilter implements MessageFilter - { - private MessageFilter _noLocalFilter; - private MessageFilter _jmsSelectorFilter; - - public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) - { - _noLocalFilter = filter; - _jmsSelectorFilter = jmsSelectorFilter; - } - - public boolean matches(Filterable message) - { - return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - CompoundFilter that = (CompoundFilter) o; - - if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) - { - return false; - } - if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; - result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); - return result; - } - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index b4eb41684d..2e6a98d81b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -159,9 +159,15 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo else { + String message = "Queue " + queueName + " not bound with routing key " + + body.getRoutingKey() + " to exchange " + exchangeName; + + if(message.length()>255) + { + message = message.substring(0,254); + } response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - new AMQShortString("Queue " + queueName + " not bound with routing key " + - body.getRoutingKey() + " to exchange " + exchangeName)); // replyText + new AMQShortString(message)); // replyText } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index d8d245e255..110c7be50a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1130,22 +1130,22 @@ public class ServerSessionDelegate extends SessionDelegate if(queueMatched) { - result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); + final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue); + result.setKeyNotMatched(!keyMatched); + if(method.hasArguments() && keyMatched) + { + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)); + } } else { result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); } - if(method.hasArguments()) - { - result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null)); - } - } else if (method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null)); + result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue)); } } @@ -1166,7 +1166,7 @@ public class ServerSessionDelegate extends SessionDelegate { if(method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null)); + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments())); } result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java deleted file mode 100644 index f4c0fec6c9..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ /dev/null @@ -1,633 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MockStoredMessage; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AbstractHeadersExchangeTestBase extends QpidTestCase -{ - private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); - - private final HeadersExchange exchange = new HeadersExchange(); - private final Set<TestQueue> queues = new HashSet<TestQueue>(); - private VirtualHost _virtualHost; - private int count; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testDoNothing() - { - // this is here only to make junit under Eclipse happy - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - String queueName = "Queue" + (++count); - - return bind(queueName, queueName, getHeadersMap(bindings)); - } - - protected void unbind(TestQueue queue, String... bindings) throws AMQException - { - String queueName = queue.getName(); - exchange.onUnbind(new Binding(null, queueName, queue, exchange, getHeadersMap(bindings))); - } - - protected int getCount() - { - return count; - } - - private TestQueue bind(String key, String queueName, Map<String,Object> args) throws AMQException - { - TestQueue queue = new TestQueue(new AMQShortString(queueName), _virtualHost); - queues.add(queue); - exchange.onBind(new Binding(null, key, queue, exchange, args)); - return queue; - } - - - protected int route(Message m) throws AMQException - { - m.getIncomingMessage().headersReceived(System.currentTimeMillis()); - m.route(exchange); - if(m.getIncomingMessage().allContentReceived()) - { - for(BaseQueue q : m.getIncomingMessage().getDestinationQueues()) - { - q.enqueue(m); - } - } - return m.getIncomingMessage().getDestinationQueues().size(); - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, false, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException - { - routeAndTest(m, expectReturn, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException - { - routeAndTest(m, false, expected); - } - - protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException - { - int queueCount = route(m); - - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - - if(expectReturn) - { - assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount); - } - - } - - static Map<String,Object> getHeadersMap(String... entries) - { - if(entries == null) - { - return null; - } - - Map<String,Object> headers = new HashMap<String,Object>(); - - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.put(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = FieldTableFactory.newFieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.setObject(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - - static final class MessagePublishInfoImpl implements MessagePublishInfo - { - private AMQShortString _exchange; - private boolean _immediate; - private boolean _mandatory; - private AMQShortString _routingKey; - - public MessagePublishInfoImpl(AMQShortString routingKey) - { - _routingKey = routingKey; - } - - public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - public AMQShortString getExchange() - { - return _exchange; - } - - public boolean isImmediate() - { - return _immediate; - - } - - public boolean isMandatory() - { - return _mandatory; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - - public void setExchange(AMQShortString exchange) - { - _exchange = exchange; - } - - public void setImmediate(boolean immediate) - { - _immediate = immediate; - } - - public void setMandatory(boolean mandatory) - { - _mandatory = mandatory; - } - - public void setRoutingKey(AMQShortString routingKey) - { - _routingKey = routingKey; - } - } - - static MessagePublishInfo getPublishRequest(final String id) - { - return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id)); - } - - static ContentHeaderBody getContentHeader(FieldTable headers) - { - ContentHeaderBody header = new ContentHeaderBody(); - header.setProperties(getProperties(headers)); - return header; - } - - static BasicContentHeaderProperties getProperties(FieldTable headers) - { - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setHeaders(headers); - return properties; - } - - static class TestQueue extends SimpleAMQQueue - { - private final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); - - public String toString() - { - return getNameShortString().toString(); - } - - public TestQueue(AMQShortString name, VirtualHost host) throws AMQException - { - super(UUIDGenerator.generateRandomUUID(), name, false, new AMQShortString("test"), true, false, host, Collections.EMPTY_MAP); - host.getQueueRegistry().registerQueue(this); - } - - - - /** - * We override this method so that the default behaviour, which attempts to use a delivery manager, is - * not invoked. It is unnecessary since for this test we only care to know whether the message was - * sent to the queue; the queue processing logic is not being tested. - * @param msg - * @throws AMQException - */ - @Override - public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException - { - messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); - final QueueEntry queueEntry = new QueueEntry() - { - - public AMQQueue getQueue() - { - return null; - } - - public AMQMessage getMessage() - { - return null; - } - - public long getSize() - { - return 0; - } - - public boolean getDeliveredToConsumer() - { - return false; - } - - public boolean expired() throws AMQException - { - return false; - } - - public boolean isAvailable() - { - return false; - } - - public boolean isAcquired() - { - return false; - } - - public boolean acquire() - { - return false; - } - - public boolean acquire(Subscription sub) - { - return false; - } - - public boolean delete() - { - return false; - } - - public boolean isDeleted() - { - return false; - } - - public boolean acquiredBySubscription() - { - return false; - } - - public boolean isAcquiredBy(Subscription subscription) - { - return false; - } - - public void release() - { - - } - - public void setRedelivered() - { - - } - - public AMQMessageHeader getMessageHeader() - { - return null; - } - - public boolean isPersistent() - { - return false; - } - - public boolean isRedelivered() - { - return false; - } - - public Subscription getDeliveredSubscription() - { - return null; - } - - public void reject() - { - - } - - public boolean isRejectedBy(long subscriptionId) - { - return false; - } - - public void dequeue() - { - - } - - public void dispose() - { - - } - - public void discard() - { - - } - - public void routeToAlternate() - { - - } - - public boolean isQueueDeleted() - { - return false; - } - - public void addStateChangeListener(StateChangeListener listener) - { - - } - - public boolean removeStateChangeListener(StateChangeListener listener) - { - return false; - } - - public int compareTo(final QueueEntry o) - { - return 0; - } - - public boolean isDequeued() - { - return false; - } - - public boolean isDispensed() - { - return false; - } - - public QueueEntry getNextNode() - { - return null; - } - - public QueueEntry getNextValidEntry() - { - return null; - } - - public int getDeliveryCount() - { - return 0; - } - - public void incrementDeliveryCount() - { - } - - public void decrementDeliveryCount() - { - } - }; - - if(action != null) - { - action.onEnqueue(queueEntry); - } - - } - - boolean isInQueue(Message msg) - { - return messages.contains(msg); - } - - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private static AtomicLong _messageId = new AtomicLong(); - - private class TestIncomingMessage extends IncomingMessage - { - - public TestIncomingMessage(final long messageId, - final MessagePublishInfo info, - final AMQProtocolSession publisher) - { - super(info); - } - - - public ContentHeaderBody getContentHeader() - { - return Message.this.getContentHeaderBody(); - } - } - - private IncomingMessage _incoming; - - - Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException - { - this(protocolSession, id, getHeaders(headers)); - } - - Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException - { - this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST); - } - - public IncomingMessage getIncomingMessage() - { - return _incoming; - } - - private Message(AMQProtocolSession protocolsession, long messageId, - MessagePublishInfo publish, - ContentHeaderBody header, - List<ContentBody> bodies) throws AMQException - { - super(new MockStoredMessage(messageId, publish, header)); - - StoredMessage<MessageMetaData> storedMessage = getStoredMessage(); - - int pos = 0; - for(ContentBody body : bodies) - { - storedMessage.addContent(pos, ByteBuffer.wrap(body.getPayload())); - pos += body.getPayload().length; - } - - _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); - _incoming.setContentHeaderBody(header); - - - } - - - private Message(AMQMessage msg) throws AMQException - { - super(msg.getStoredMessage()); - } - - - - void route(Exchange exchange) throws AMQException - { - _incoming.enqueue(exchange.route(_incoming)); - } - - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - try - { - return getMessagePublishInfo().getRoutingKey(); - } - catch (AMQException e) - { - _log.error("Error getting routing key: " + e, e); - return null; - } - } - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 2ddb417d5d..7b7e2ec346 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -21,22 +21,32 @@ package org.apache.qpid.server.exchange; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class FanoutExchangeTest extends TestCase { @@ -51,7 +61,9 @@ public class FanoutExchangeTest extends TestCase _virtualHost = mock(VirtualHost.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); - when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); } @@ -76,14 +88,14 @@ public class FanoutExchangeTest extends TestCase { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, (FieldTable) null, queue)); + _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue)); } public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, queue)); + _exchange.isBound(new AMQShortString("matters"), queue)); } public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException @@ -95,9 +107,86 @@ public class FanoutExchangeTest extends TestCase private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException { + AMQQueue queue = mockQueue(); + _exchange.addBinding("matters", queue, null); + return queue; + } + + private AMQQueue mockQueue() + { AMQQueue queue = mock(AMQQueue.class); when(queue.getVirtualHost()).thenReturn(_virtualHost); - _exchange.addBinding("does not matter", queue, null); return queue; } + + public void testRoutingWithSelectors() throws Exception + { + AMQQueue queue1 = mockQueue(); + AMQQueue queue2 = mockQueue(); + + _exchange.addBinding("key",queue1, null); + _exchange.addBinding("key",queue2, null); + + + List<? extends BaseQueue> result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); + + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.removeBinding("key",queue2,null); + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + result = _exchange.route(mockMessage(false)); + + assertEquals("Expected message to be routed to queue1 only", 1, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertFalse("Expected queue2 not to be routed to", result.contains(queue2)); + + _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); + + + result = _exchange.route(mockMessage(false)); + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + } + + private InboundMessage mockMessage(boolean val) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader("select")).thenReturn(true); + when(header.getHeader("select")).thenReturn(val); + when(header.getHeaderNames()).thenReturn(Collections.singleton("select")); + when(header.containsHeaders(anySet())).then(new Answer<Object>() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return names.size() == 1 && names.contains("select"); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index bd6a02d69b..2b965358e0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -20,106 +20,230 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.util.BrokerTestHelper; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import junit.framework.TestCase; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class HeadersExchangeTest extends TestCase { - private AMQProtocolSession _protocolSession; + private HeadersExchange _exchange; + private VirtualHost _virtualHost; @Override public void setUp() throws Exception { super.setUp(); - BrokerTestHelper.setUp(); - _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock()); + + CurrentActor.setDefault(mock(LogActor.class)); + _exchange = new HeadersExchange(); + _virtualHost = mock(VirtualHost.class); + SecurityManager securityManager = mock(SecurityManager.class); + when(_virtualHost.getSecurityManager()).thenReturn(securityManager); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); + } - @Override - public void tearDown() throws Exception + protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception { - BrokerTestHelper.tearDown(); - super.tearDown(); + List<? extends BaseQueue> results = _exchange.route(msg); + List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results); + unexpected.removeAll(Arrays.asList(expected)); + assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); + List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected)); + missing.removeAll(results); + assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty()); + assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size()); } - public void testSimple() throws AMQException + + private AMQQueue createAndBind(final String name, String... arguments) + throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - TestQueue q4 = bindDefault("F0001=Bear"); - TestQueue q5 = bindDefault("F0000", "F0001"); - TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); - TestQueue q7 = bindDefault("F0000", "F0001=Bear"); - TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), - q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); - - Message m7 = new Message(_protocolSession, "Message7", "XXXXX"); - - MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); - pb7.setMandatory(true); - routeAndTest(m7,true); - - Message m8 = new Message(_protocolSession, "Message8", "F0000"); - MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); - pb8.setMandatory(true); - routeAndTest(m8,false,q1); + return createAndBind(name, getArgsMapFromStrings(arguments)); + } + + private Map<String, Object> getArgsMapFromStrings(String... arguments) + { + Map<String, Object> map = new HashMap<String,Object>(); + + for(String arg : arguments) + { + if(arg.contains("=")) + { + String[] keyValue = arg.split("=",2); + map.put(keyValue[0],keyValue[1]); + } + else + { + map.put(arg,null); + } + } + return map; + } + private AMQQueue createAndBind(final String name, Map<String, Object> arguments) + throws Exception + { + AMQQueue q = create(name); + bind(name, arguments, q); + return q; + } + private void bind(String bindingKey, Map<String, Object> arguments, AMQQueue q) + throws AMQSecurityException, AMQInternalException + { + _exchange.addBinding(bindingKey,q,arguments); } - public void testAny() throws AMQException + private AMQQueue create(String name) { - TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); - TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); - TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); - TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); - TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); + AMQQueue q = mock(AMQQueue.class); + when(q.toString()).thenReturn(name); + when(q.getVirtualHost()).thenReturn(_virtualHost); + return q; } - public void testMandatory() throws AMQException + + public void testSimple() throws Exception { - bindDefault("F0000"); - Message m1 = new Message(_protocolSession, "Message1", "XXXXX"); - Message m2 = new Message(_protocolSession, "Message2", "F0000"); - MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); - pb1.setMandatory(true); - MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); - pb2.setMandatory(true); - routeAndTest(m1,true); + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + AMQQueue q4 = createAndBind("Q4", "F0001=Bear"); + AMQQueue q5 = createAndBind("Q5", "F0000", "F0001"); + AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear"); + AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear"); + AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), + q1, q2, q3, q4, q5, q6, q7, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } - - public void testOnUnbind() throws AMQException + + public void testAny() throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3); - - unbind(q1,"F0000"); - routeAndTest(new Message(_protocolSession, "Message4", "F0000")); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2); + AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any"); + AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any"); + AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any"); + AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } + + public void testOnUnbind() throws Exception + { + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3); + + _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000")); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000"))); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2); + } + + + public void testWithSelectors() throws Exception + { + AMQQueue q1 = create("Q1"); + AMQQueue q2 = create("Q2"); + bind("q1",getArgsMapFromStrings("F"), q1); + bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1); + bind("q2",getArgsMapFromStrings("F=1"), q2); + + routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2); + + + AMQQueue q3 = create("Q3"); + bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1); + bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3); + + } + + private InboundMessage mockMessage(final Map<String, Object> headerValues) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader(anyString())).then(new Answer<Boolean>() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.containsKey((String) invocation.getArguments()[0]); + } + }); + when(header.getHeader(anyString())).then(new Answer<Object>() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.get((String) invocation.getArguments()[0]); + } + }); + when(header.getHeaderNames()).thenReturn(headerValues.keySet()); + when(header.containsHeaders(anySet())).then(new Answer<Boolean>() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return headerValues.keySet().containsAll(names); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; } - public static junit.framework.Test suite() { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index b9e9a33cd6..922cc1e2a7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -31,7 +31,7 @@ public class AMQHeadersExchange extends AMQDestination { public AMQHeadersExchange(BindingURL binding) { - this(binding.getExchangeName()); + super(binding); } public AMQHeadersExchange(String name) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e784e903fa..018a1ec851 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -440,7 +440,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // If the session has been closed don't waste time creating a thread to do // flow control if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) - { + { // Only execute change if previous state // was False if (!_suspendState.getAndSet(true)) @@ -535,7 +535,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract AMQException getLastException(); - + public void checkNotClosed() throws JMSException { try @@ -553,7 +553,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ssnClosed.setLinkedException(ex); ssnClosed.initCause(ex); throw ssnClosed; - } + } else { throw ise; @@ -987,13 +987,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method return createDurableSubscriber(topic, name, null, false); } - + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { checkNotClosed(); Topic origTopic = checkValidTopic(topic, true); - + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); if (dest.getDestSyntax() == DestSyntax.ADDR && !dest.isAddressResolved()) @@ -1015,20 +1015,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw toJMSException("Error when verifying destination", e); } } - + String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; - + _subscriberDetails.lock(); try { TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name); - + // Not subscribed to this name in the current session if (subscriber == null) { // After the address is resolved routing key will not be null. AMQShortString topicName = dest.getRoutingKey(); - + if (_strictAMQP) { if (_strictAMQPFATAL) @@ -1046,8 +1046,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic else { Map<String,Object> args = new HashMap<String,Object>(); - - // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all @@ -1060,16 +1060,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); - boolean isQueueBoundForTopicAndSelector = + boolean isQueueBoundForTopicAndSelector = isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); if (isQueueBound && !isQueueBoundForTopicAndSelector) { deleteQueue(dest.getAMQQueueName()); } + else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match + { + try + { + bindQueue(dest.getAMQQueueName(), dest.getRoutingKey(), + FieldTable.convertToFieldTable(args), dest.getExchangeName(), dest, true); + } + catch(AMQException e) + { + throw toJMSException("Error when checking binding",e); + } + } } } - else + else { // Subscribed with the same topic and no current / previous or same selector if (subscriber.getTopic().equals(topic) @@ -1100,7 +1112,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _subscriberAccess.unlock(); } - + return subscriber; } catch (TransportException e) @@ -1193,19 +1205,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (syntax == AMQDestination.DestSyntax.BURL) { // For testing we may want to use the prefix - return new AMQQueue(getDefaultQueueExchangeName(), + return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName))); } else { AMQQueue queue = new AMQQueue(queueName); return queue; - + } } else { - return new AMQQueue(queueName); + return new AMQQueue(queueName); } } catch (URISyntaxException urlse) @@ -1341,7 +1353,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return new QueueReceiverAdaptor(dest, consumer); } - + private Queue validateQueue(Destination dest) throws InvalidDestinationException { if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue) @@ -1497,9 +1509,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - return new AMQTopic(topicName); + return new AMQTopic(topicName); } - + } catch (URISyntaxException urlse) { @@ -1646,16 +1658,24 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug("Message[" + message.toString() + "] received in session"); } _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); + _queue.add(message); } public void declareAndBind(AMQDestination amqd) throws AMQException { + declareAndBind(amqd, new FieldTable()); + } + + + public void declareAndBind(AMQDestination amqd, FieldTable arguments) + throws + AMQException + { declareExchange(amqd, false); AMQShortString queueName = declareQueue(amqd, false); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); + bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd); } /** @@ -1681,7 +1701,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Not that this does not necessarily mean that the recovery has failed, but simply that it is * not possible to tell if it has or not. * @todo Be aware of possible changes to parameter order as versions change. - * + * * Strategy for handling recover. * Flush any acks not yet sent. * Stop the message flow. @@ -1730,7 +1750,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } sendRecover(); - + markClean(); if (!isSuspended) @@ -1755,7 +1775,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected abstract void sendRecover() throws AMQException, FailoverException; protected abstract void flushAcknowledgments(); - + public void rejectMessage(UnprocessedMessage message, boolean requeue) { @@ -1851,7 +1871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setMessageListener(MessageListener listener) throws JMSException { } - + /** * @see #unsubscribe(String, boolean) */ @@ -1866,20 +1886,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e); } } - + /** * Unsubscribe from a subscription. - * + * * @param name the name of the subscription to unsubscribe * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the * queue is not bound, possibly due to the subscription being closed. - * @throws JMSException on + * @throws JMSException on * @throws InvalidDestinationException */ private void unsubscribe(String name, boolean safe) throws JMSException { TopicSubscriberAdaptor<C> subscriber; - + _subscriberDetails.lock(); try { @@ -1896,11 +1916,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _subscriberDetails.unlock(); } - + if (subscriber != null) { subscriber.close(); - + // send a queue.delete for the subscription deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } @@ -1917,7 +1937,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + " Requesting queue deletion regardless."); } - + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } else // Queue Browser @@ -1936,8 +1956,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } protected C createConsumerImpl(final Destination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException + final int prefetchLow, final boolean noLocal, + final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -2111,7 +2132,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException; public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; - + public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException; /** @@ -2844,14 +2865,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { declareExchange(amqd, nowait); } - + if (_delareQueues || amqd.isNameRequired()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } - bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); + if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey())) + { + bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), + amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait); + } + } - + AMQShortString queueName = amqd.getAMQQueueName(); // store the consumer queue name @@ -2895,10 +2921,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) + throws AMQException; + public abstract void resolveAddress(AMQDestination dest, boolean isConsumer, boolean noLocal) throws AMQException; - + private void registerProducer(long producerId, MessageProducer producer) { _producers.put(new Long(producerId), producer); @@ -3189,7 +3218,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } - + public void run() { if (_dispatcherLogger.isDebugEnabled()) @@ -3304,7 +3333,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (updateRollbackMark(current, deliveryTag)) { _rollbackMark.compareAndSet(current, deliveryTag); - } + } } private void notifyConsumer(UnprocessedMessage message) @@ -3424,7 +3453,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return super.isClosing() || _connection.isClosing(); } - + public boolean isDeclareExchanges() { return _declareExchanges; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e2cfe0e27f..1baaff738b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -143,7 +143,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); private TimerTask flushTask = null; private RangeSet unacked = RangeSetFactory.createRangeSet(); - private int unackedCount = 0; + private int unackedCount = 0; /** * Used to store the range of in tx messages @@ -292,7 +292,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { flushAcknowledgments(false); } - + void flushAcknowledgments(boolean setSyncBit) { synchronized (unacked) @@ -310,7 +310,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { messageAcknowledge(ranges,accept,false); } - + void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit) { final Session ssn = getQpidSession(); @@ -354,15 +354,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - + for (AMQShortString rk: destination.getBindingKeys()) { - _logger.debug("Binding queue : " + queueName.toString() + - " exchange: " + exchangeName.toString() + + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), - exchangeName.toString(), - rk.toString(), + getQpidSession().exchangeBind(queueName.toString(), + exchangeName.toString(), + rk.toString(), args); } } @@ -371,10 +371,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. List<Binding> bindings = new ArrayList<Binding>(); bindings.addAll(destination.getNode().getBindings()); - + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; - + for (Binding binding: bindings) { // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. @@ -386,22 +386,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } String queue = binding.getQueue() == null? queueName.asString(): binding.getQueue(); - - String exchange = binding.getExchange() == null ? + + String exchange = binding.getExchange() == null ? defaultExchange : binding.getExchange(); - - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, + getQpidSession().exchangeBind(queue, exchange, binding.getBindingKey(), - binding.getArgs()); + binding.getArgs()); } } - + if (!nowait) { // We need to sync so that we get notify of an error. @@ -561,20 +561,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException { return isQueueBound(exchangeName,queueName,routingKey,null); } - public boolean isQueueBound(final AMQDestination destination) throws JMSException + public boolean isQueueBound(final AMQDestination destination) { return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); } public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) - throws JMSException { - String rk = null; + String rk = null; if (bindingKeys != null && bindingKeys.length>0) { rk = bindingKeys[0].toString(); @@ -583,10 +581,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { rk = routingKey.toString(); } - + return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); } - + public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) { boolean res; @@ -598,21 +596,27 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); } else - { + { if (args == null) { - res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult .getQueueNotMatched()); } else { - res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched()); } } return res; } + @Override + protected boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) + { + return isQueueBound(exchangeName, amqQueueName, routingKey); + } + /** * This method is invoked when a consumer is created * Registers the consumer with the broker @@ -730,7 +734,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } /** - * deletes an exchange + * deletes an exchange */ public void sendExchangeDelete(final String name, final boolean nowait) throws AMQException, FailoverException @@ -763,12 +767,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } if (amqd.getDestSyntax() == DestSyntax.BURL) - { + { Map<String,Object> arguments = new HashMap<String,Object>(); if (noLocal) - { + { arguments.put(AddressHelper.NO_LOCAL, true); - } + } getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, @@ -790,7 +794,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic arguments, node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, - node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } // passive --> false @@ -837,7 +841,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { long capacity = consumer.getCapacity(); - + if (capacity == 0) { if (consumer.getMessageListener() != null) @@ -1090,20 +1094,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return AMQMessageDelegateFactory.FACTORY_0_10; } - + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); - match = !result.getNotFound(); + match = !result.getNotFound(); Node node = dest.getNode(); - + if (match) { if (assertNode) { - match = (result.getDurable() == node.isDurable()) && - (node.getExchangeType() != null && + match = (result.getDurable() == node.isDurable()) && + (node.getExchangeType() != null && node.getExchangeType().equals(result.getType())) && (matchProps(result.getArguments(),node.getDeclareArgs())); } @@ -1125,7 +1129,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { boolean match = true; @@ -1137,7 +1141,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (match && assertNode) { - match = (result.getDurable() == node.isDurable()) && + match = (result.getDurable() == node.isDurable()) && (result.getAutoDelete() == node.isAutoDelete()) && (result.getExclusive() == node.isExclusive()) && (matchProps(result.getArguments(),node.getDeclareArgs())); @@ -1165,17 +1169,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } return match; } - + private boolean matchProps(Map<String,Object> target,Map<String,Object> source) { boolean match = true; for (String key: source.keySet()) { - match = target.containsKey(key) && + match = target.containsKey(key) && target.get(key).equals(source.get(key)); - - if (!match) - { + + if (!match) + { StringBuffer buf = new StringBuffer(); buf.append("Property given in address did not match with the args sent by the broker."); buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); @@ -1184,22 +1188,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } } - + return match; } /** * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, + * 2. if type == queue, * 2.1 verify queue exists or create if create == true * 2.2 If not throw exception - * + * * 3. if type == exchange, * 3.1 verify exchange exists or create if create == true * 3.2 if not throw exception * 3.3 if exchange exists (or created) create subscription queue. */ - + @SuppressWarnings("deprecation") public void resolveAddress(AMQDestination dest, boolean isConsumer, @@ -1211,21 +1215,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || + boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || (!isConsumer && dest.getAssert() == AddressOption.SENDER); - + boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - + + + int type = resolveAddressType(dest); - + switch (type) { - case AMQDestination.QUEUE_TYPE: + case AMQDestination.QUEUE_TYPE: { if(createNode) { @@ -1239,24 +1243,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic break; } } - - case AMQDestination.TOPIC_TYPE: + + case AMQDestination.TOPIC_TYPE: { if(createNode) - { + { setLegacyFiledsForTopicType(dest); verifySubject(dest); handleExchangeNodeCreation(dest); break; } else if (isExchangeExist(dest,assertNode)) - { + { setLegacyFiledsForTopicType(dest); verifySubject(dest); break; } } - + default: throw new AMQException( "The name '" + dest.getAddressName() + @@ -1265,7 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setAddressResolved(System.currentTimeMillis()); } } - + public int resolveAddressType(AMQDestination dest) throws AMQException { int type = dest.getAddressType(); @@ -1292,14 +1296,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } dest.setAddressType(type); return type; - } + } } - + private void verifySubject(AMQDestination dest) throws AMQException { if (dest.getSubject() == null || dest.getSubject().trim().equals("")) { - + if ("topic".equals(dest.getExchangeClass().toString())) { dest.setRoutingKey(new AMQShortString("#")); @@ -1364,12 +1368,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // legacy support dest.setExchangeName(new AMQShortString(dest.getAddressName())); Node node = dest.getNode(); - dest.setExchangeClass(node.getExchangeType() == null? + dest.setExchangeClass(node.getExchangeType() == null? ExchangeDefaults.TOPIC_EXCHANGE_CLASS: - new AMQShortString(node.getExchangeType())); + new AMQShortString(node.getExchangeType())); dest.setRoutingKey(new AMQShortString(dest.getSubject())); } - + protected void acknowledgeImpl() { RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); @@ -1412,7 +1416,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); } - + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); RangeSet all = RangeSetFactory.createRangeSet(delivered.size() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3097b33da3..9a9da62f2a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -184,7 +184,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // thread. // We can't close the session if we are already in the process of // closing/closed the connection. - + if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING))) { @@ -381,10 +381,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { public AMQMethodEvent execute() throws AMQException, FailoverException { - AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody - (exchangeName, routingKey, queueName).generateFrame(getChannelId()); - - return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + return sendExchangeBound(exchangeName, routingKey, queueName); } }, getAMQConnection()).execute(); @@ -398,7 +395,38 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } - } + } + + @Override + protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws AMQException + { + + AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, routingKey, queueName); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.getReplyCode() == 0); + } + + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, + AMQShortString routingKey, + AMQShortString queueName) throws AMQException, FailoverException + { + AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody + (exchangeName, routingKey, queueName).generateFrame(getChannelId()); + + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + } @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, @@ -527,7 +555,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe JMSException ex = new JMSException("Error creating producer"); ex.initCause(e); ex.setLinkedException(e); - + throw ex; } } @@ -609,7 +637,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); - + return null; } }, getAMQConnection()).execute(); @@ -671,7 +699,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe false, null).generateFrame(getChannelId()); QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); return okHandler.getMessageCount(); } @@ -689,9 +717,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { return AMQMessageDelegateFactory.FACTORY_0_8; } - + public void sync() throws AMQException - { + { declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } @@ -702,10 +730,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); } - + protected void flushAcknowledgments() { - + } @Override @@ -744,7 +772,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for AMQStateManager manager = getProtocolHandler().getStateManager(); - + Exception e = manager.getLastException(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && e != null) @@ -752,15 +780,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (e instanceof AMQException) { return (AMQException) e; - } + } else { AMQException amqe = new AMQException(AMQConstant - .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), + .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), e.getMessage(), e.getCause()); return amqe; } - } + } else { return null; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 96cd209447..d78e725a5d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -42,7 +42,7 @@ public class AMQTopic extends AMQDestination implements Topic { super(address); } - + protected AMQTopic() { super(); @@ -89,6 +89,12 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } + + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + { + super(exchangeName, exchangeClass, name, true, isAutoDelete, queueName, isDurable); + } + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { @@ -114,10 +120,10 @@ public class AMQTopic extends AMQDestination implements Topic AMQTopic t = new AMQTopic(qpidTopic.getAddress()); AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); // link is never null if dest was created using an address string. - t.getLink().setName(queueName.asString()); + t.getLink().setName(queueName.asString()); t.getLink().getSubscriptionQueue().setAutoDelete(false); t.getLink().setDurable(true); - + // The legacy fields are also populated just in case. t.setQueueName(queueName); t.setAutoDelete(false); @@ -134,7 +140,7 @@ public class AMQTopic extends AMQDestination implements Topic } else { - return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false, + return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getExchangeClass(), qpidTopic.getRoutingKey(), false, getDurableTopicQueueName(subscriptionName, connection), true); } @@ -165,7 +171,7 @@ public class AMQTopic extends AMQDestination implements Topic return null; } } - + @Override public AMQShortString getExchangeName() { @@ -181,9 +187,9 @@ public class AMQTopic extends AMQDestination implements Topic public AMQShortString getRoutingKey() { - if (super.getRoutingKey() != null) + if (super.getRoutingKey() != null) { - return super.getRoutingKey(); + return super.getRoutingKey(); } else if (getSubject() != null) { diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 57cd2a1ff5..f50e65214c 100644 --- a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -40,7 +40,6 @@ public enum AMQPFilterTypes /** The identifying string for the filter type. */ private final AMQShortString _value; - /** * Creates a new filter type from its identifying string. * @@ -60,4 +59,10 @@ public enum AMQPFilterTypes { return _value; } + + @Override + public String toString() + { + return _value.asString(); + } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index 3d116f1b1b..91f56f369b 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -83,9 +83,12 @@ public class ReturnUnroutableMandatoryMessageTest extends QpidBrokerTestCase imp AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); + FieldTable ft = new FieldTable(); ft.setString("F1000", "1"); - consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft); + consumerSession.declareAndBind(queue, ft); + + consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java index dfd26b474a..646c17d1f2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java @@ -111,7 +111,7 @@ public class BindingLoggingTest extends AbstractTestLogging String messageID = "BND-1001"; String queueName = _queue.getQueueName(); String exchange = "direct/amq.direct"; - String message = "Create : Arguments : {x-filter-jms-selector=}"; + String message = "Create"; validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName); } @@ -145,7 +145,7 @@ public class BindingLoggingTest extends AbstractTestLogging // Perform full testing on the binding String message = getMessageString(fromMessage(getLogMessage(results, 0))); - + validateLogMessage(getLogMessage(results, 0), messageID, message, "topic/amq.topic", "topic", "clientid:" + getName()); @@ -208,17 +208,17 @@ public class BindingLoggingTest extends AbstractTestLogging validateMessageID(messageID, log); String subject = fromSubject(log); - + validateBindingDeleteArguments(subject, "/test"); assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log))); } - + private void validateBindingDeleteArguments(String subject, String vhostName) { String routingKey = AbstractTestLogSubject.getSlice("rk", subject); - + assertTrue("Routing Key does not start with TempQueue:"+routingKey, routingKey.startsWith("TempQueue")); assertEquals("Virtualhost not correct.", vhostName, diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 3783b0bd02..67a2988ad1 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,16 +7,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ package org.apache.qpid.test.client.destination; @@ -49,7 +49,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class); private Connection _connection; - + @Override public void setUp() throws Exception { @@ -57,20 +57,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _connection = getConnection() ; _connection.start(); } - + @Override public void tearDown() throws Exception { _connection.close(); super.tearDown(); } - + public void testCreateOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageProducer prod; MessageConsumer cons; - + // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; @@ -84,7 +84,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + try { prod = jmsSession.createProducer(dest); @@ -94,22 +94,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest,false)); - - + + // create always ------------------------------------------- addr1 = "ADDR:testQueue1; { create: always }"; dest = new AMQAnyDestination(addr1); - cons = jmsSession.createConsumer(dest); - + cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + // create receiver ----------------------------------------- addr1 = "ADDR:testQueue2; { create: receiver }"; dest = new AMQAnyDestination(addr1); @@ -122,32 +122,32 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - - - cons = jmsSession.createConsumer(dest); - + + + cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; dest = new AMQAnyDestination(addr1); try { - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } catch(JMSException e) { assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + try { prod = jmsSession.createProducer(dest); @@ -157,17 +157,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - + // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; dest = new AMQAnyDestination(addr1); - + try { - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } catch(JMSException e) { @@ -176,84 +176,84 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - + prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + } - + public void testCreateQueue() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-queue/hello; " + - "{" + + "{" + "create: always, " + - "node: " + - "{" + + "node: " + + "{" + "durable: true ," + "x-declare: " + - "{" + + "{" + "exclusive: true," + - "arguments: {" + + "arguments: {" + "'qpid.max_size': 1000," + "'qpid.max_count': 100" + - "}" + - "}, " + - "x-bindings: [{exchange : 'amq.direct', key : test}, " + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', key : test}, " + "{exchange : 'amq.fanout'}," + "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," + "{exchange : 'amq.topic', key : 'a.#'}" + - "]," + - + "]," + + "}" + "}"; AMQDestination dest = new AMQAnyDestination(addr); - MessageConsumer cons = jmsSession.createConsumer(dest); + MessageConsumer cons = jmsSession.createConsumer(dest); cons.close(); - + // Even if the consumer is closed the queue and the bindings should be intact. - + assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", + (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", dest.getAddressName(),null, null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); - + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + dest.getAddressName(),"a.#", null)); + Map<String,Object> args = new HashMap<String,Object>(); args.put("x-match","any"); args.put("dep","sales"); args.put("loc","CA"); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); - + MessageProducer prod = jmsSession.createProducer(dest); prod.send(jmsSession.createTextMessage("test")); - + MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue")); Message m = cons2.receive(1000); assertNotNull("Should receive message sent to my-queue",m); assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT)); } - + public void testCreateExchange() throws Exception { createExchangeImpl(false, false); @@ -283,21 +283,21 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - String addr = "ADDR:my-exchange/hello; " + - "{ " + - "create: always, " + - "node: " + + String addr = "ADDR:my-exchange/hello; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true" + createExchangeArgsString(withExchangeArgs, useNonsenseArguments) + "}" + "}" + "}"; - + AMQDestination dest = new AMQAnyDestination(addr); MessageConsumer cons; @@ -322,20 +322,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("Unexpected exception whilst creating consumer: " + e); } } - + assertTrue("Exchange not created as expected",( (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); - + // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", - dest.getQueueName(),"hello", Collections.<String, Object>emptyMap())); - + (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", + dest.getQueueName(),"hello", null)); + // The client should be able to query and verify the existence of my-exchange (QPID-2774) dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}"); - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } - + private String createExchangeArgsString(final boolean withExchangeArgs, final boolean useNonsenseArguments) { @@ -366,60 +366,60 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", - dest.getAddressName(),"test", null)); - + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + dest.getAddressName(),"test", null)); + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); - + Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, a.getOptions())); } - + /** * Test goal: Verifies that a producer and consumer creation triggers the correct * behavior for x-bindings specified in node props. */ public void testBindQueueWithArgs() throws Exception { - + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}"; - - String addr = "node: " + - "{" + + + String addr = "node: " + + "{" + "durable: true ," + - "x-declare: " + - "{ " + + "x-declare: " + + "{ " + "auto-delete: true," + "arguments: {'qpid.max_count': 100}" + "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + - "{exchange : 'amq.topic', key : 'a.#'}," + - headersBinding + + "{exchange : 'amq.topic', key : 'a.#'}," + + headersBinding + "]" + "}" + "}"; - + AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr); - MessageConsumer cons = jmsSession.createConsumer(dest1); - checkQueueForBindings(jmsSession,dest1,headersBinding); - + MessageConsumer cons = jmsSession.createConsumer(dest1); + checkQueueForBindings(jmsSession,dest1,headersBinding); + AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr); - MessageProducer prod = jmsSession.createProducer(dest2); - checkQueueForBindings(jmsSession,dest2,headersBinding); + MessageProducer prod = jmsSession.createProducer(dest2); + checkQueueForBindings(jmsSession,dest2,headersBinding); } - + /** * Test goal: Verifies the capacity property in address string is handled properly. * Test strategy: @@ -427,22 +427,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Creates consumer with client ack. * Sends 15 messages to the queue, tries to receive 10. * Tries to receive the 11th message and checks if its null. - * - * Since capacity is 10 and we haven't acked any messages, + * + * Since capacity is 10 and we haven't acked any messages, * we should not have received the 11th. - * + * * Acks the 10th message and verifies we receive the rest of the msgs. */ public void testCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}"); } - + public void testSourceAndTargetCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}"); } - + private void verifyCapacity(String address) throws Exception { if (!isCppBroker()) @@ -450,13 +450,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _logger.info("Not C++ broker, exiting test"); return; } - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination dest = new AMQAnyDestination(address); - MessageConsumer cons = jmsSession.createConsumer(dest); + MessageConsumer cons = jmsSession.createConsumer(dest); MessageProducer prod = jmsSession.createProducer(dest); - + for (int i=0; i< 15; i++) { prod.send(jmsSession.createTextMessage("msg" + i) ); @@ -475,48 +475,48 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT)); } } - + /** * Test goal: Verifies if the new address format based destinations * can be specified and loaded correctly from the properties file. - * + * */ public void testLoadingFromPropertiesFile() throws Exception { - Hashtable<String,String> map = new Hashtable<String,String>(); - map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + + Hashtable<String,String> map = new Hashtable<String,String>(); + map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}"); - + map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }"); map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'"); - + PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory(); Context ctx = props.getInitialContext(map); - - AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); + + AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2"); AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3"); - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons1 = jmsSession.createConsumer(dest1); + MessageConsumer cons1 = jmsSession.createConsumer(dest1); MessageConsumer cons2 = jmsSession.createConsumer(dest2); MessageConsumer cons3 = jmsSession.createConsumer(dest3); - + assertTrue("Destination1 was not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); - + assertTrue("Destination1 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); - + assertTrue("Destination2 was not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); - + assertTrue("Destination2 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest2.getAddressName(),dest2.getAddressName(), null)); - + MessageProducer producer = jmsSession.createProducer(dest3); producer.send(jmsSession.createTextMessage("Hello")); TextMessage msg = (TextMessage)cons3.receive(1000); @@ -527,31 +527,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Test goal: Verifies the subject can be overridden using "qpid.subject" message property. * Test strategy: Creates and address with a default subject "topic1" * Creates a message with "qpid.subject"="topic2" and sends it. - * Verifies that the message goes to "topic2" instead of "topic1". + * Verifies that the message goes to "topic2" instead of "topic1". */ public void testOverridingSubject() throws Exception { Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}"); - + MessageProducer prod = jmsSession.createProducer(topic1); - + Message m = jmsSession.createTextMessage("Hello"); m.setStringProperty("qpid.subject", "topic2"); - + MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1); MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}")); - + prod.send(m); Message msg = consForTopic1.receive(1000); assertNull("message shouldn't have been sent to topic1",msg); - + msg = consForTopic2.receive(1000); - assertNotNull("message should have been sent to topic2",msg); - + assertNotNull("message should have been sent to topic2",msg); + } - + /** * Test goal: Verifies that session.createQueue method * works as expected both with the new and old addressing scheme. @@ -559,19 +559,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSessionCreateQueue() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Destination queue = ssn.createQueue("my-queue"); - MessageProducer prod = ssn.createProducer(queue); + MessageProducer prod = ssn.createProducer(queue); MessageConsumer cons = ssn.createConsumer(queue); assertTrue("my-queue was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "my-queue","my-queue", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method // default case queue = ssn.createQueue("ADDR:my-queue2"); @@ -586,34 +586,34 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "doesn't resolve to an exchange or a queue"; assertEquals(s,e.getCause().getCause().getMessage()); } - + // explicit create case queue = ssn.createQueue("ADDR:my-queue2; {create: sender}"); - prod = ssn.createProducer(queue); + prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("", + (AMQSession_0_10)ssn).isQueueBound("", "my-queue2","my-queue2", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method to create a more complicated queue String addr = "ADDR:amq.direct/x512; {" + - "link : {name : 'MY.RESP.QUEUE', " + + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; queue = ssn.createQueue(addr); - + cons = ssn.createConsumer(queue); prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); cons.close(); } - + /** * Test goal: Verifies that session.creatTopic method works as expected * both with the new and old addressing scheme. @@ -635,68 +635,68 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Topic topic = ssn.createTopic("ACME"); - MessageProducer prod = ssn.createProducer(topic); + MessageProducer prod = ssn.createProducer(topic); MessageConsumer cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method topic = ssn.createTopic("ADDR:ACME"); - prod = ssn.createProducer(topic); + prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - String addr = "ADDR:vehicles/bus; " + - "{ " + - "create: always, " + - "node: " + + String addr = "ADDR:vehicles/bus; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true" + createExchangeArgsString(withExchangeArgs, false) + "}" + "}, " + "link: {name : my-topic, " + "x-bindings: [{exchange : 'vehicles', key : car}, " + - "{exchange : 'vehicles', key : van}]" + - "}" + + "{exchange : 'vehicles', key : van}]" + + "}" + "}"; - + // Using the ADDR method to create a more complicated topic topic = ssn.createTopic(addr); cons = ssn.createConsumer(topic); prod = ssn.createProducer(topic); - + assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","bus", null)); - + assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","car", null)); - + assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","van", null)); - + Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "van"); prod.send(msg); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + /** * Test Goal : Verify the default subjects used for each exchange type. * The default for amq.topic is "#" and for the rest it's "" @@ -704,92 +704,92 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testDefaultSubjects() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct")); MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic")); - + MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct")); MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather")); MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales")); - + queueProducer.send(ssn.createBytesMessage()); assertNotNull("The consumer subscribed to amq.direct " + "with empty binding key should have received the message ",queueCons.receive(1000)); - + topicProducer1.send(ssn.createTextMessage("25c")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"25c"); - + topicProducer2.send(ssn.createTextMessage("1000")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"1000"); } - + /** * Test Goal : Verify that 'mode : browse' works as expected using a regular consumer. * This indirectly tests ring queues as well. */ public void testBrowseMode() throws Exception { - + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " + "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " + "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}"; - + Destination dest = ssn.createQueue(addr); MessageConsumer browseCons = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test")); - + prod.send(ssn.createTextMessage("Test1")); prod.send(ssn.createTextMessage("Test2")); - + TextMessage msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test1"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test2"); - - browseCons.close(); + + browseCons.close(); prod.send(ssn.createTextMessage("Test3")); browseCons = ssn.createConsumer(dest); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the second message again",msg.getText(),"Test2"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3"); - + assertNull("Should not receive anymore messages",browseCons.receive(500)); } - + /** * Test Goal : When the same destination is used when creating two consumers, - * If the type == topic, verify that unique subscription queues are created, + * If the type == topic, verify that unique subscription queues are created, * unless subscription queue has a name. - * + * * If the type == queue, same queue should be shared. */ public void testSubscriptionForSameDestination() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); MessageConsumer consumer1 = ssn.createConsumer(dest); MessageConsumer consumer2 = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); TextMessage m = (TextMessage)consumer1.receive(1000); assertEquals("Consumer1 should recieve message A",m.getText(),"A"); m = (TextMessage)consumer2.receive(1000); assertEquals("Consumer2 should recieve message A",m.getText(),"A"); - + consumer1.close(); consumer2.close(); - + dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}"); consumer1 = ssn.createConsumer(dest); try @@ -798,61 +798,61 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } _connection.close(); - + _connection = getConnection() ; _connection.start(); - ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); dest = ssn.createTopic("ADDR:my_queue; {create: always}"); consumer1 = ssn.createConsumer(dest); consumer2 = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); - Message m1 = consumer1.receive(1000); + Message m1 = consumer1.receive(1000); Message m2 = consumer2.receive(1000); - + if (m1 != null) { - assertNull("Only one consumer should receive the message",m2); + assertNull("Only one consumer should receive the message",m2); } else { - assertNotNull("Only one consumer should receive the message",m2); + assertNotNull("Only one consumer should receive the message",m2); } } - + public void testXBindingsWithoutExchangeName() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String addr = "ADDR:MRKT; " + "{" + - "create: receiver," + + "create: receiver," + "node : {type: topic, x-declare: {type: topic} }," + "link:{" + "name: my-topic," + "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" + "}" + "}"; - + // Using the ADDR method to create a more complicated topic Topic topic = ssn.createTopic(addr); MessageConsumer cons = ssn.createConsumer(topic); - + assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NYSE.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NASDAQ.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); - + MessageProducer prod = ssn.createProducer(topic); Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "NASDAQ.ABCD"); @@ -860,7 +860,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + public void testXSubscribeOverrides() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -873,41 +873,41 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } } - + public void testQueueReceiversAndTopicSubscriber() throws Exception { Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); - + QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueReceiver receiver = qSession.createReceiver(queue); - + TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber sub = tSession.createSubscriber(topic); - + Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); prod1.send(ssn.createTextMessage("test1")); - + MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); prod2.send(ssn.createTextMessage("test2")); - + Message msg1 = receiver.receive(); assertNotNull(msg1); assertEquals("test1",((TextMessage)msg1).getText()); - + Message msg2 = sub.receive(); assertNotNull(msg2); - assertEquals("test2",((TextMessage)msg2).getText()); + assertEquals("test2",((TextMessage)msg2).getText()); } - + public void testDurableSubscriber() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; Properties props = new Properties(); @@ -916,19 +916,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; - props.setProperty("destination.address5", addrStr); - - Context ctx = new InitialContext(props); + props.setProperty("destination.address5", addrStr); + + Context ctx = new InitialContext(props); for (int i=1; i < 4; i++) { Topic topic = (Topic) ctx.lookup("address"+i); createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test"); } - + Topic topic = ssn.createTopic("ADDR:news.us"); createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us"); - + Topic namedQueue = (Topic) ctx.lookup("address5"); try { @@ -1001,10 +1001,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception - { + { MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr)); - + Message m = ssn.createTextMessage(destName); prod.send(m); Message msg = cons.receive(1000); @@ -1012,12 +1012,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertEquals(destName,((TextMessage)msg).getText()); ssn.unsubscribe(destName); } - + public void testDeleteOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer cons; - + // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; @@ -1031,11 +1031,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - - + + String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; dest = new AMQAnyDestination(addr2); try @@ -1047,11 +1047,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - + String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; dest = new AMQAnyDestination(addr3); try @@ -1064,43 +1064,43 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); } - + /** * Test Goals : 1. Test if the client sets the correct accept mode for unreliable * and at-least-once. * 2. Test default reliability modes for Queues and Topics. * 3. Test if an exception is thrown if exactly-once is used. * 4. Test if an exception is thrown if at-least-once is used with topics. - * + * * Test Strategy: For goal #1 & #2 * For unreliable and at-least-once the test tries to receives messages * in client_ack mode but does not ack the messages. * It will then close the session, recreate a new session * and will then try to verify the queue depth. * For unreliable the messages should have been taken off the queue. - * For at-least-once the messages should be put back onto the queue. - * + * For at-least-once the messages should be put back onto the queue. + * */ - + public void testReliabilityOptions() throws Exception { String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}"; acceptModeTest(addr1,0); - + String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}"; acceptModeTest(addr2,2); - + // Default accept-mode for topics - acceptModeTest("ADDR:amq.topic/test",0); - + acceptModeTest("ADDR:amq.topic/test",0); + // Default accept-mode for queues acceptModeTest("ADDR:testQueue1;{create: always}",2); - - String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; + + String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; try { AMQAnyDestination dest = new AMQAnyDestination(addr3); @@ -1111,83 +1111,83 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); } } - + private void acceptModeTest(String address, int expectedQueueDepth) throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons; MessageProducer prod; - + AMQDestination dest = new AMQAnyDestination(address); cons = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + for (int i=0; i < expectedQueueDepth; i++) { prod.send(ssn.createTextMessage("Msg" + i)); } - + for (int i=0; i < expectedQueueDepth; i++) { Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("Msg" + i,((TextMessage)msg).getText()); } - + ssn.close(); ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); - assertEquals(expectedQueueDepth,queueDepth); + long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); + assertEquals(expectedQueueDepth,queueDepth); cons.close(); - prod.close(); + prod.close(); } - + public void testDestinationOnSend() throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - + Topic queue = ssn.createTopic("ADDR:amq.topic/test"); prod.send(queue,ssn.createTextMessage("A")); - + Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("A",((TextMessage)msg).getText()); prod.close(); cons.close(); } - + public void testReplyToWithNamelessExchange() throws Exception { System.setProperty("qpid.declare_exchanges","false"); replyToTest("ADDR:my-queue;{create: always}"); System.setProperty("qpid.declare_exchanges","true"); } - + public void testReplyToWithCustomExchange() throws Exception { replyToTest("ADDR:hello;{create:always,node:{type:topic}}"); } - + private void replyToTest(String replyTo) throws Exception { - Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination replyToDest = AMQDestination.createDestination(replyTo); MessageConsumer replyToCons = session.createConsumer(replyToDest); - + Destination dest = session.createQueue("ADDR:amq.direct/test"); - + MessageConsumer cons = session.createConsumer(dest); MessageProducer prod = session.createProducer(dest); Message m = session.createTextMessage("test"); m.setJMSReplyTo(replyToDest); prod.send(m); - + Message msg = cons.receive(); MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); prodR.send(session.createTextMessage("x")); - + Message m1 = replyToCons.receive(); assertNotNull("The reply to consumer should have received the messsage",m1); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index 626592dc10..5dcf678510 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -207,21 +207,21 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener } assertTrue("No exception thrown!", caught); caught = false; - + } - + public void testRuntimeSelectorError() throws JMSException { Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1"); MessageProducer producer = session.createProducer(_destination); Message sentMsg = session.createTextMessage(); - + sentMsg.setIntProperty("testproperty", 1); // 1 % 5 producer.send(sentMsg); Message recvd = consumer.receive(RECIEVE_TIMEOUT); assertNotNull(recvd); - + sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense producer.send(sentMsg); try @@ -231,47 +231,47 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener } catch (Exception e) { - + } assertFalse("Connection should not be closed", _connection.isClosed()); } - + public void testSelectorWithJMSMessageID() throws Exception { Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); - + MessageProducer prod = session.createProducer(_destination); MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL"); - + for (int i=0; i<2; i++) { Message msg = session.createTextMessage("Msg" + String.valueOf(i)); prod.send(msg); } session.commit(); - + Message msg1 = consumer.receive(1000); Message msg2 = consumer.receive(1000); - + Assert.assertNotNull("Msg1 should not be null", msg1); Assert.assertNotNull("Msg2 should not be null", msg2); - + session.commit(); - + prod.setDisableMessageID(true); - - for (int i=0; i<2; i++) + + for (int i=2; i<4; i++) { Message msg = session.createTextMessage("Msg" + String.valueOf(i)); prod.send(msg); } - + session.commit(); - Message msg3 = consumer.receive(1000); + Message msg3 = consumer.receive(1000); Assert.assertNull("Msg3 should be null", msg3); session.commit(); consumer = session.createConsumer(_destination,"JMSMessageID IS NULL"); - + Message msg4 = consumer.receive(1000); Message msg5 = consumer.receive(1000); session.commit(); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index e861b4f4ee..f8ab593c88 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -56,19 +56,19 @@ public class StreamMessageTest extends QpidBrokerTestCase public void testStreamMessageEOF() throws Exception { - Connection con = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL( ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); + FieldTable ft = new FieldTable(); ft.setString("x-match", "any"); ft.setString("F1000", "1"); - MessageConsumer consumer = - consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); - + consumerSession.declareAndBind(queue, ft); + MessageConsumer consumer = consumerSession.createConsumer(queue); // force synch to ensure the consumer has resulted in a bound queue // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); // This is the default now diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 5dae98fe21..6bf20d7708 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.test.unit.topic; +import javax.jms.JMSException; +import javax.naming.NamingException; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -37,6 +40,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import org.apache.qpid.url.URLSyntaxException; /** @author Apache Software Foundation */ @@ -225,6 +229,44 @@ public class TopicSessionTest extends QpidBrokerTestCase AMQTopic topic = new AMQTopic(con, "testNoLocal"); + noLocalTest(con, topic); + + + con.close(); + } + + + public void testNoLocalDirectExchange() throws Exception + { + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + + AMQTopic topic = new AMQTopic("direct://amq.direct/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'"); + + noLocalTest(con, topic); + + + con.close(); + } + + + + public void testNoLocalFanoutExchange() throws Exception + { + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + + AMQTopic topic = new AMQTopic("fanout://amq.fanout/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'"); + + noLocalTest(con, topic); + + con.close(); + } + + + private void noLocalTest(AMQConnection con, AMQTopic topic) + throws JMSException, URLSyntaxException, AMQException, NamingException + { TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber noLocal = session1.createSubscriber(topic, "", true); @@ -304,9 +346,6 @@ public class TopicSessionTest extends QpidBrokerTestCase //test nolocal subscriber does message m = (TextMessage) noLocal.receive(1000); assertNotNull(m); - - - con.close(); con2.close(); } |