summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-01 19:24:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-01 19:24:36 +0000
commita74a60f8fe5d15deb665cae72af8bae110e195d9 (patch)
treee0b5a10c8c765bc52d5abc4c1674dc9da61cdffe /java
parent1a88e5743df4f7eb18fe06cfbe56e797d6bca6ea (diff)
downloadqpid-python-a74a60f8fe5d15deb665cae72af8bae110e195d9.tar.gz
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java99
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java127
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java190
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java258
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java41
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java105
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java289
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java16
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java633
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java97
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java276
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java115
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java146
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java62
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java546
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java34
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java45
26 files changed, 1501 insertions, 1661 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 0d05307cb4..58c2b33041 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -173,20 +174,106 @@ public abstract class AbstractExchange implements Exchange
return getVirtualHost().getQueueRegistry();
}
- public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+ public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue)
{
- return isBound(new AMQShortString(bindingKey), queue);
+ return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue);
}
+ public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+ {
+ return (b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments);
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQShortString routingKey, AMQQueue queue)
+ {
+ return isBound(routingKey==null ? "" : routingKey.asString(), queue);
+ }
+
+ public final boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQShortString routingKey)
+ {
+ return isBound(routingKey == null ? "" : routingKey.asString());
+ }
+
+ public final boolean isBound(String bindingKey)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(queue == b.getQueue())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
- public boolean isBound(String bindingKey, AMQQueue queue)
+ @Override
+ public final boolean isBound(Map<String, Object> arguments, AMQQueue queue)
{
- return isBound(new AMQShortString(bindingKey), queue);
+ for(Binding b : _bindings)
+ {
+ if(queue == b.getQueue() &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public final boolean isBound(String bindingKey, Map<String, Object> arguments)
+ {
+ for(Binding b : _bindings)
+ {
+ if(b.getBindingKey().equals(bindingKey) &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
+ {
+ return true;
+ }
+ }
+ return false;
}
- public boolean isBound(String bindingKey)
+ public final boolean hasBindings()
{
- return isBound(new AMQShortString(bindingKey));
+ return !_bindings.isEmpty();
}
public Exchange getAlternateExchange()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index 4e136965a1..ccf955ed1c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -272,6 +272,18 @@ public class DefaultExchange implements Exchange
}
@Override
+ public boolean isBound(Map<String, Object> arguments, AMQQueue queue)
+ {
+ return (arguments == null || arguments.isEmpty()) && isBound(queue);
+ }
+
+ @Override
+ public boolean isBound(String bindingKey, Map<String, Object> arguments)
+ {
+ return (arguments == null || arguments.isEmpty()) && isBound(bindingKey);
+ }
+
+ @Override
public boolean isBound(String bindingKey)
{
return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index fc6ce15bc4..2e2a93d638 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -20,9 +20,18 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class DirectExchange extends AbstractExchange
{
+
+ private static final Logger _logger = Logger.getLogger(DirectExchange.class);
+
private static final class BindingSet
{
private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
- private List<BaseQueue> _queues = new ArrayList<BaseQueue>();
+ private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
+ private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
public synchronized void addBinding(Binding binding)
{
@@ -56,27 +69,59 @@ public class DirectExchange extends AbstractExchange
private void recalculateQueues()
{
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
+ Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
for(Binding b : _bindings)
{
- if(!queues.contains(b.getQueue()))
+
+ if(FilterSupport.argumentsContainFilter(b.getArguments()))
{
- queues.add(b.getQueue());
+ try
+ {
+ MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue());
+ filteredQueues.put(b.getQueue(),filter);
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName()
+ + "' to exchange '" + b.getExchange().getName()
+ + "' with arguments: " + b.getArguments(), e);
+ }
+
+ }
+ else
+ {
+
+ if(!queues.contains(b.getQueue()))
+ {
+ queues.add(b.getQueue());
+ }
}
}
- _queues = queues;
+ _unfilteredQueues = queues;
+ _filteredQueues = filteredQueues;
}
- public List<BaseQueue> getQueues()
+ public List<BaseQueue> getUnfilteredQueues()
{
- return _queues;
+ return _unfilteredQueues;
}
public CopyOnWriteArraySet<Binding> getBindings()
{
return _bindings;
}
+
+ public boolean hasFilteredQueues()
+ {
+ return !_filteredQueues.isEmpty();
+ }
+
+ public Map<BaseQueue,MessageFilter> getFilteredQueues()
+ {
+ return _filteredQueues;
+ }
}
private final ConcurrentHashMap<String, BindingSet> _bindingsByKey =
@@ -98,7 +143,30 @@ public class DirectExchange extends AbstractExchange
if(bindings != null)
{
- return bindings.getQueues();
+ List<BaseQueue> queues = bindings.getUnfilteredQueues();
+
+ if(bindings.hasFilteredQueues())
+ {
+ Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
+
+ Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues();
+ for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet())
+ {
+ if(!queuesSet.contains(entry.getKey()))
+ {
+ MessageFilter filter = entry.getValue();
+ if(filter.matches(payload))
+ {
+ queuesSet.add(entry.getKey());
+ }
+ }
+ }
+ if(queues.size() != queuesSet.size())
+ {
+ queues = new ArrayList<BaseQueue>(queuesSet);
+ }
+ }
+ return queues;
}
else
{
@@ -106,50 +174,6 @@ public class DirectExchange extends AbstractExchange
}
-
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey,queue);
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- BindingSet bindings = _bindingsByKey.get(bindingKey);
- if(bindings != null)
- {
- return bindings.getQueues().contains(queue);
- }
- return false;
-
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- BindingSet bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.getQueues().isEmpty();
- }
-
- public boolean isBound(AMQQueue queue)
- {
-
- for (BindingSet bindings : _bindingsByKey.values())
- {
- if(bindings.getQueues().contains(queue))
- {
- return true;
- }
-
- }
- return false;
- }
-
- public boolean hasBindings()
- {
- return !getBindings().isEmpty();
}
protected void onBind(final Binding binding)
@@ -189,5 +213,4 @@ public class DirectExchange extends AbstractExchange
}
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index a5a1d7f912..d483c3b29b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -145,12 +145,15 @@ public interface Exchange extends ExchangeReferrer
Collection<Binding> getBindings();
+ boolean isBound(String bindingKey);
boolean isBound(String bindingKey, AMQQueue queue);
- public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
+ boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
- boolean isBound(String bindingKey);
+ boolean isBound(Map<String, Object> arguments, AMQQueue queue);
+
+ boolean isBound(String bindingKey, Map<String, Object> arguments);
void removeReference(ExchangeReferrer exchange);
@@ -158,6 +161,8 @@ public interface Exchange extends ExchangeReferrer
boolean hasReferrers();
+
+
public interface BindingListener
{
void bindingAdded(Exchange exchange, Binding binding);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 6ad5eb261e..cd830d69a9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -20,11 +20,21 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -42,7 +52,18 @@ public class FanoutExchange extends AbstractExchange
/**
* Maps from queue name to queue instances
*/
- private final ConcurrentHashMap<AMQQueue,Integer> _queues = new ConcurrentHashMap<AMQQueue,Integer>();
+ private final Map<AMQQueue,Integer> _queues = new HashMap<AMQQueue,Integer>();
+ private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+ private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+
+ private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>> _filteredBindings =
+ new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>();
+ {
+ Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap();
+ _filteredBindings.set(emptyMap);
+ }
+
+
public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType();
@@ -54,115 +75,150 @@ public class FanoutExchange extends AbstractExchange
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Publishing message to queue " + _queues);
- }
-
for(Binding b : getBindings())
{
b.incrementMatches();
}
- return new ArrayList<BaseQueue>(_queues.keySet());
-
- }
+ final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey, queue);
- }
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(queue);
- }
+ final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get();
+ if(!_filteredQueues.isEmpty())
+ {
+ for(AMQQueue q : _filteredQueues)
+ {
+ final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+ if(!(bindingMessageFilterMap == null || result.contains(q)))
+ {
+ for(MessageFilter filter : bindingMessageFilterMap.values())
+ {
+ if(filter.matches(payload))
+ {
+ result.add(q);
+ break;
+ }
+ }
+ }
+ }
- public boolean isBound(AMQShortString routingKey)
- {
+ }
- return (_queues != null) && !_queues.isEmpty();
- }
- public boolean isBound(AMQQueue queue)
- {
- if (queue == null)
+ if (_logger.isDebugEnabled())
{
- return false;
+ _logger.debug("Publishing message to queue " + result);
}
- return _queues.containsKey(queue);
- }
- public boolean hasBindings()
- {
- return !_queues.isEmpty();
+ return result;
+
}
- protected void onBind(final Binding binding)
+
+ protected synchronized void onBind(final Binding binding)
{
AMQQueue queue = binding.getQueue();
assert queue != null;
+ if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
+ {
- Integer oldVal;
+ Integer oldVal;
+ if(_queues.containsKey(queue))
+ {
+ _queues.put(queue,_queues.get(queue)+1);
+ }
+ else
+ {
+ _queues.put(queue, ONE);
+ _unfilteredQueues.add(queue);
+ // No longer any reason to check filters for this queue
+ _filteredQueues.remove(queue);
+ }
- if((oldVal = _queues.putIfAbsent(queue, ONE)) != null)
+ }
+ else
{
- Integer newVal = oldVal+1;
- while(!_queues.replace(queue, oldVal, newVal))
+ try
{
- oldVal = _queues.get(queue);
- if(oldVal == null)
+
+ HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+ Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+ final
+ MessageFilter messageFilter =
+ FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
+
+ if(bindingsForQueue != null)
{
- oldVal = _queues.putIfAbsent(queue, ONE);
- if(oldVal == null)
+ bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue.put(binding, messageFilter);
+ }
+ else
+ {
+ bindingsForQueue = Collections.singletonMap(binding, messageFilter);
+ if(!_unfilteredQueues.contains(queue))
{
- break;
+ _filteredQueues.add(queue);
}
}
- newVal = oldVal + 1;
+
+ filteredBindings.put(binding.getQueue(), bindingsForQueue);
+
+ _filteredBindings.set(filteredBindings);
+
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e);
+ return;
}
}
-
if (_logger.isDebugEnabled())
{
_logger.debug("Binding queue " + queue
- + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this);
+ + " with routing key " + binding.getBindingKey() + " to exchange " + this);
}
}
- protected void onUnbind(final Binding binding)
+ protected synchronized void onUnbind(final Binding binding)
{
AMQQueue queue = binding.getQueue();
- Integer oldValue = _queues.get(queue);
-
- boolean done = false;
-
- while(!(done || oldValue == null))
+ if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
{
- while(!(done || oldValue == null) && oldValue.intValue() == 1)
+ Integer oldValue = _queues.remove(queue);
+ if(ONE.equals(oldValue))
{
- if(!_queues.remove(queue, oldValue))
+ // should start checking filters for this queue
+ if(_filteredBindings.get().containsKey(queue))
{
- oldValue = _queues.get(queue);
- }
- else
- {
- done = true;
+ _filteredQueues.add(queue);
}
+ _unfilteredQueues.remove(queue);
}
- while(!(done || oldValue == null) && oldValue.intValue() != 1)
+ else
{
- Integer newValue = oldValue - 1;
- if(!_queues.replace(queue, oldValue, newValue))
- {
- oldValue = _queues.get(queue);
- }
- else
- {
- done = true;
- }
+ _queues.put(queue,oldValue-1);
}
}
+ else // we are removing a binding with filters
+ {
+ HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+ Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+ if(bindingsForQueue.size()>1)
+ {
+ bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue.remove(binding);
+ filteredBindings.put(binding.getQueue(),bindingsForQueue);
+ }
+ else
+ {
+ _filteredQueues.remove(queue);
+ }
+ _filteredBindings.set(filteredBindings);
+
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
new file mode 100644
index 0000000000..880c9a2cf6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.exchange;
+
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.Filterable;
+
+public class FilterSupport
+{
+ private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache =
+ Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>());
+
+ static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
+ {
+ final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ return getMessageFilter(selectorString);
+ }
+
+
+ static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException
+ {
+ final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ return getMessageFilter(selectorString);
+ }
+
+
+ private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException
+ {
+ WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
+ JMSSelectorFilter selector = null;
+
+ if(selectorRef == null || (selector = selectorRef.get())==null)
+ {
+ try
+ {
+ selector = new JMSSelectorFilter(selectorString);
+ }
+ catch (ParseException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (SelectorParsingException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (TokenMgrError e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
+ }
+ return selector;
+ }
+
+ static boolean argumentsContainFilter(final FieldTable args)
+ {
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ }
+
+
+ static boolean argumentsContainFilter(final Map<String, Object> args)
+ {
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ }
+
+
+ static boolean argumentsContainNoLocal(final Map<String, Object> args)
+ {
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString()));
+ }
+
+
+ static boolean argumentsContainNoLocal(final FieldTable args)
+ {
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
+ }
+
+
+ static boolean argumentsContainJMSSelector(final Map<String,Object> args)
+ {
+ return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String)
+ && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
+ }
+
+
+ static boolean argumentsContainJMSSelector(final FieldTable args)
+ {
+ return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
+ && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
+ }
+
+
+ static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+ {
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+ }
+
+ static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
+ {
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+ }
+
+ static final class NoLocalFilter implements MessageFilter
+ {
+ private final AMQQueue _queue;
+
+ public NoLocalFilter(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ InboundMessage inbound = (InboundMessage) message;
+ final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
+ return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ NoLocalFilter that = (NoLocalFilter) o;
+
+ return _queue == null ? that._queue == null : _queue.equals(that._queue);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _queue != null ? _queue.hashCode() : 0;
+ }
+ }
+
+ static final class CompoundFilter implements MessageFilter
+ {
+ private MessageFilter _noLocalFilter;
+ private MessageFilter _jmsSelectorFilter;
+
+ public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
+ {
+ _noLocalFilter = filter;
+ _jmsSelectorFilter = jmsSelectorFilter;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ CompoundFilter that = (CompoundFilter) o;
+
+ if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+ {
+ return false;
+ }
+ if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
+ result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
+ return result;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index b6f5f973f4..eb4a84a5b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.AMQMessageHeader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* Defines binding and matching based on a set of headers.
@@ -44,13 +48,14 @@ class HeadersBinding
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
+ private MessageFilter _filter;
/**
* Creates a header binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
* no constraint on the value. Those with a non-null value are assumed to
* define a required match of value.
- *
+ *
* @param binding the binding to create a header binding using
*/
public HeadersBinding(Binding binding)
@@ -66,9 +71,30 @@ class HeadersBinding
_mappings = null;
}
}
-
+
private void initMappings()
{
+ if(FilterSupport.argumentsContainFilter(_mappings))
+ {
+ try
+ {
+ _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue());
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName()
+ +"' to exchange '"+_binding.getExchange().getName()
+ +"' with arguments: " + _binding.getArguments());
+ _filter = new MessageFilter()
+ {
+ @Override
+ public boolean matches(Filterable message)
+ {
+ return false;
+ }
+ };
+ }
+ }
for(Map.Entry<String, Object> entry : _mappings.entrySet())
{
String propertyName = entry.getKey();
@@ -87,7 +113,7 @@ class HeadersBinding
}
}
}
-
+
public Binding getBinding()
{
return _binding;
@@ -111,6 +137,11 @@ class HeadersBinding
}
}
+ public boolean matches(InboundMessage message)
+ {
+ return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
+ }
+
private boolean and(AMQMessageHeader headers)
{
if(headers.containsHeaders(required))
@@ -215,7 +246,7 @@ class HeadersBinding
{
return key.startsWith("X-") || key.startsWith("x-");
}
-
+
@Override
public boolean equals(final Object o)
{
@@ -250,4 +281,4 @@ class HeadersBinding
return true;
}
-} \ No newline at end of file
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 746c8ac6bc..9fb745d553 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -69,14 +69,14 @@ public class HeadersExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
-
+
private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
-
+
private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
new CopyOnWriteArrayList<HeadersBinding>();
-
+
public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType();
public HeadersExchange()
@@ -87,112 +87,31 @@ public class HeadersExchange extends AbstractExchange
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
- AMQMessageHeader header = payload.getMessageHeader();
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header);
+ _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader());
}
-
+
LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
-
+
for (HeadersBinding hb : _bindingHeaderMatchers)
{
- if (hb.matches(header))
+ if (hb.matches(payload))
{
Binding b = hb.getBinding();
-
+
b.incrementMatches();
-
+
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
- header + " to " + b.getQueue().getNameShortString());
+ payload.getMessageHeader() + " to " + b.getQueue().getNameShortString());
}
queues.add(b.getQueue());
}
}
-
- return new ArrayList<BaseQueue>(queues);
- }
-
-
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- CopyOnWriteArraySet<Binding> bindings;
- if(bindingKey == null)
- {
- bindings = new CopyOnWriteArraySet<Binding>(getBindings());
- }
- else
- {
- bindings = _bindingsByKey.get(bindingKey);
- }
-
- if(bindings != null)
- {
- for(Binding binding : bindings)
- {
- if(queue == null || binding.getQueue().equals(queue))
- {
- return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments);
- }
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- //fixme isBound here should take the arguements in to consideration.
- return isBound(routingKey, queue);
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
-
- if(bindings != null)
- {
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.isEmpty();
- }
-
- public boolean isBound(AMQQueue queue)
- {
- for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
- {
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
- }
-
- return false;
- }
- public boolean hasBindings()
- {
- return !getBindings().isEmpty();
+ return new ArrayList<BaseQueue>(queues);
}
protected void onBind(final Binding binding)
@@ -216,7 +135,7 @@ public class HeadersExchange extends AbstractExchange
bindings = newBindings;
}
}
-
+
if(_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 6d548be508..9d41856dc0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -20,21 +20,15 @@
*/
package org.apache.qpid.server.exchange;
-import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.filter.SelectorParsingException;
-import org.apache.qpid.filter.selector.ParseException;
-import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
@@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
public class TopicExchange extends AbstractExchange
{
@@ -65,8 +55,6 @@ public class TopicExchange extends AbstractExchange
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
- private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
-
public TopicExchange()
{
super(TYPE);
@@ -77,7 +65,7 @@ public class TopicExchange extends AbstractExchange
AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
AMQQueue queue = binding.getQueue();
FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
-
+
assert queue != null;
assert rKey != null;
@@ -91,26 +79,26 @@ public class TopicExchange extends AbstractExchange
FieldTable oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- if(argumentsContainFilter(oldArgs))
+ if(FilterSupport.argumentsContainFilter(oldArgs))
{
result.replaceQueueFilter(queue,
- createMessageFilter(oldArgs, queue),
- createMessageFilter(args, queue));
+ FilterSupport.createMessageFilter(oldArgs, queue),
+ FilterSupport.createMessageFilter(args, queue));
}
else
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
result.removeUnfilteredQueue(queue);
}
}
else
{
- if(argumentsContainFilter(oldArgs))
+ if(FilterSupport.argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
+ result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
}
else
{
@@ -118,7 +106,7 @@ public class TopicExchange extends AbstractExchange
return;
}
}
-
+
result.addBinding(binding);
}
@@ -129,9 +117,9 @@ public class TopicExchange extends AbstractExchange
if(result == null)
{
result = new TopicExchangeResult();
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
}
else
{
@@ -142,89 +130,22 @@ public class TopicExchange extends AbstractExchange
}
else
{
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
}
else
{
result.addUnfilteredQueue(queue);
}
}
-
+
result.addBinding(binding);
_bindings.put(binding, args);
}
}
- private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
- {
- if(argumentsContainNoLocal(args))
- {
- MessageFilter filter = new NoLocalFilter(queue);
-
- if(argumentsContainJMSSelector(args))
- {
- filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
- }
- return filter;
- }
- else
- {
- return createJMSSelectorFilter(args);
- }
-
- }
-
-
- private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
- {
- final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
- WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
- JMSSelectorFilter selector = null;
-
- if(selectorRef == null || (selector = selectorRef.get())==null)
- {
- try
- {
- selector = new JMSSelectorFilter(selectorString);
- }
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (TokenMgrError e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
- }
- return selector;
- }
-
- private static boolean argumentsContainFilter(final FieldTable args)
- {
- return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
- }
-
- private static boolean argumentsContainNoLocal(final FieldTable args)
- {
- return args != null
- && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
- && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
- }
-
- private static boolean argumentsContainJMSSelector(final FieldTable args)
- {
- return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
- && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
- }
-
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -256,87 +177,6 @@ public class TopicExchange extends AbstractExchange
}
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
-
- if (arguments == null)
- {
- return _bindings.containsKey(binding);
- }
- else
- {
- FieldTable o = _bindings.get(binding);
- if (o != null)
- {
- return o.equals(arguments);
- }
- else
- {
- return false;
- }
-
- }
- }
-
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- Binding binding = new Binding(null, bindingKey, queue, this, arguments);
- if (arguments == null)
- {
- return _bindings.containsKey(binding);
- }
- else
- {
- FieldTable o = _bindings.get(binding);
- if (o != null)
- {
- return arguments.equals(FieldTable.convertToMap(o));
- }
- else
- {
- return false;
- }
- }
-
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(routingKey, null, queue);
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- for(Binding b : _bindings.keySet())
- {
- if(b.getBindingKey().equals(routingKey.toString()))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQQueue queue)
- {
- for(Binding b : _bindings.keySet())
- {
- if(b.getQueue().equals(queue))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean hasBindings()
- {
- return !_bindings.isEmpty();
- }
-
private boolean deregisterQueue(final Binding binding)
{
if(_bindings.containsKey(binding))
@@ -344,14 +184,15 @@ public class TopicExchange extends AbstractExchange
FieldTable bindingArgs = _bindings.remove(binding);
AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
-
+
result.removeBinding(binding);
-
- if(argumentsContainFilter(bindingArgs))
+
+ if(FilterSupport.argumentsContainFilter(bindingArgs))
{
try
{
- result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
+ result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs,
+ binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
@@ -418,96 +259,4 @@ public class TopicExchange extends AbstractExchange
deregisterQueue(binding);
}
- private static final class NoLocalFilter implements MessageFilter
- {
- private final AMQQueue _queue;
-
- public NoLocalFilter(AMQQueue queue)
- {
- _queue = queue;
- }
-
- public boolean matches(Filterable message)
- {
- InboundMessage inbound = (InboundMessage) message;
- final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
- return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
-
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
-
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- NoLocalFilter that = (NoLocalFilter) o;
-
- return _queue == null ? that._queue == null : _queue.equals(that._queue);
- }
-
- @Override
- public int hashCode()
- {
- return _queue != null ? _queue.hashCode() : 0;
- }
- }
-
- private static final class CompoundFilter implements MessageFilter
- {
- private MessageFilter _noLocalFilter;
- private MessageFilter _jmsSelectorFilter;
-
- public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
- {
- _noLocalFilter = filter;
- _jmsSelectorFilter = jmsSelectorFilter;
- }
-
- public boolean matches(Filterable message)
- {
- return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- CompoundFilter that = (CompoundFilter) o;
-
- if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
- {
- return false;
- }
- if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
- result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
- return result;
- }
- }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index b4eb41684d..2e6a98d81b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -159,9 +159,15 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
else
{
+ String message = "Queue " + queueName + " not bound with routing key " +
+ body.getRoutingKey() + " to exchange " + exchangeName;
+
+ if(message.length()>255)
+ {
+ message = message.substring(0,254);
+ }
response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- new AMQShortString("Queue " + queueName + " not bound with routing key " +
- body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
+ new AMQShortString(message)); // replyText
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index d8d245e255..110c7be50a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -1130,22 +1130,22 @@ public class ServerSessionDelegate extends SessionDelegate
if(queueMatched)
{
- result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+ final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue);
+ result.setKeyNotMatched(!keyMatched);
+ if(method.hasArguments() && keyMatched)
+ {
+ result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue));
+ }
}
else
{
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
}
- if(method.hasArguments())
- {
- result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null));
- }
-
}
else if (method.hasArguments())
{
- result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null));
+ result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
}
}
@@ -1166,7 +1166,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
if(method.hasArguments())
{
- result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null));
+ result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments()));
}
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
deleted file mode 100644
index f4c0fec6c9..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MockStoredMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AbstractHeadersExchangeTestBase extends QpidTestCase
-{
- private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
-
- private final HeadersExchange exchange = new HeadersExchange();
- private final Set<TestQueue> queues = new HashSet<TestQueue>();
- private VirtualHost _virtualHost;
- private int count;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_virtualHost != null)
- {
- _virtualHost.close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- public void testDoNothing()
- {
- // this is here only to make junit under Eclipse happy
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- protected TestQueue bindDefault(String... bindings) throws AMQException
- {
- String queueName = "Queue" + (++count);
-
- return bind(queueName, queueName, getHeadersMap(bindings));
- }
-
- protected void unbind(TestQueue queue, String... bindings) throws AMQException
- {
- String queueName = queue.getName();
- exchange.onUnbind(new Binding(null, queueName, queue, exchange, getHeadersMap(bindings)));
- }
-
- protected int getCount()
- {
- return count;
- }
-
- private TestQueue bind(String key, String queueName, Map<String,Object> args) throws AMQException
- {
- TestQueue queue = new TestQueue(new AMQShortString(queueName), _virtualHost);
- queues.add(queue);
- exchange.onBind(new Binding(null, key, queue, exchange, args));
- return queue;
- }
-
-
- protected int route(Message m) throws AMQException
- {
- m.getIncomingMessage().headersReceived(System.currentTimeMillis());
- m.route(exchange);
- if(m.getIncomingMessage().allContentReceived())
- {
- for(BaseQueue q : m.getIncomingMessage().getDestinationQueues())
- {
- q.enqueue(m);
- }
- }
- return m.getIncomingMessage().getDestinationQueues().size();
- }
-
- protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, false, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, expectReturn, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
- {
- int queueCount = route(m);
-
- for (TestQueue q : queues)
- {
- if (expected.contains(q))
- {
- assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
- }
- }
-
- if(expectReturn)
- {
- assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
- }
-
- }
-
- static Map<String,Object> getHeadersMap(String... entries)
- {
- if(entries == null)
- {
- return null;
- }
-
- Map<String,Object> headers = new HashMap<String,Object>();
-
- for (String s : entries)
- {
- String[] parts = s.split("=", 2);
- headers.put(parts[0], parts.length > 1 ? parts[1] : "");
- }
- return headers;
- }
-
- static FieldTable getHeaders(String... entries)
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- for (String s : entries)
- {
- String[] parts = s.split("=", 2);
- headers.setObject(parts[0], parts.length > 1 ? parts[1] : "");
- }
- return headers;
- }
-
-
- static final class MessagePublishInfoImpl implements MessagePublishInfo
- {
- private AMQShortString _exchange;
- private boolean _immediate;
- private boolean _mandatory;
- private AMQShortString _routingKey;
-
- public MessagePublishInfoImpl(AMQShortString routingKey)
- {
- _routingKey = routingKey;
- }
-
- public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
- {
- _exchange = exchange;
- _immediate = immediate;
- _mandatory = mandatory;
- _routingKey = routingKey;
- }
-
- public AMQShortString getExchange()
- {
- return _exchange;
- }
-
- public boolean isImmediate()
- {
- return _immediate;
-
- }
-
- public boolean isMandatory()
- {
- return _mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
-
- public void setExchange(AMQShortString exchange)
- {
- _exchange = exchange;
- }
-
- public void setImmediate(boolean immediate)
- {
- _immediate = immediate;
- }
-
- public void setMandatory(boolean mandatory)
- {
- _mandatory = mandatory;
- }
-
- public void setRoutingKey(AMQShortString routingKey)
- {
- _routingKey = routingKey;
- }
- }
-
- static MessagePublishInfo getPublishRequest(final String id)
- {
- return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id));
- }
-
- static ContentHeaderBody getContentHeader(FieldTable headers)
- {
- ContentHeaderBody header = new ContentHeaderBody();
- header.setProperties(getProperties(headers));
- return header;
- }
-
- static BasicContentHeaderProperties getProperties(FieldTable headers)
- {
- BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
- properties.setHeaders(headers);
- return properties;
- }
-
- static class TestQueue extends SimpleAMQQueue
- {
- private final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
-
- public String toString()
- {
- return getNameShortString().toString();
- }
-
- public TestQueue(AMQShortString name, VirtualHost host) throws AMQException
- {
- super(UUIDGenerator.generateRandomUUID(), name, false, new AMQShortString("test"), true, false, host, Collections.EMPTY_MAP);
- host.getQueueRegistry().registerQueue(this);
- }
-
-
-
- /**
- * We override this method so that the default behaviour, which attempts to use a delivery manager, is
- * not invoked. It is unnecessary since for this test we only care to know whether the message was
- * sent to the queue; the queue processing logic is not being tested.
- * @param msg
- * @throws AMQException
- */
- @Override
- public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException
- {
- messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
- final QueueEntry queueEntry = new QueueEntry()
- {
-
- public AMQQueue getQueue()
- {
- return null;
- }
-
- public AMQMessage getMessage()
- {
- return null;
- }
-
- public long getSize()
- {
- return 0;
- }
-
- public boolean getDeliveredToConsumer()
- {
- return false;
- }
-
- public boolean expired() throws AMQException
- {
- return false;
- }
-
- public boolean isAvailable()
- {
- return false;
- }
-
- public boolean isAcquired()
- {
- return false;
- }
-
- public boolean acquire()
- {
- return false;
- }
-
- public boolean acquire(Subscription sub)
- {
- return false;
- }
-
- public boolean delete()
- {
- return false;
- }
-
- public boolean isDeleted()
- {
- return false;
- }
-
- public boolean acquiredBySubscription()
- {
- return false;
- }
-
- public boolean isAcquiredBy(Subscription subscription)
- {
- return false;
- }
-
- public void release()
- {
-
- }
-
- public void setRedelivered()
- {
-
- }
-
- public AMQMessageHeader getMessageHeader()
- {
- return null;
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
- public boolean isRedelivered()
- {
- return false;
- }
-
- public Subscription getDeliveredSubscription()
- {
- return null;
- }
-
- public void reject()
- {
-
- }
-
- public boolean isRejectedBy(long subscriptionId)
- {
- return false;
- }
-
- public void dequeue()
- {
-
- }
-
- public void dispose()
- {
-
- }
-
- public void discard()
- {
-
- }
-
- public void routeToAlternate()
- {
-
- }
-
- public boolean isQueueDeleted()
- {
- return false;
- }
-
- public void addStateChangeListener(StateChangeListener listener)
- {
-
- }
-
- public boolean removeStateChangeListener(StateChangeListener listener)
- {
- return false;
- }
-
- public int compareTo(final QueueEntry o)
- {
- return 0;
- }
-
- public boolean isDequeued()
- {
- return false;
- }
-
- public boolean isDispensed()
- {
- return false;
- }
-
- public QueueEntry getNextNode()
- {
- return null;
- }
-
- public QueueEntry getNextValidEntry()
- {
- return null;
- }
-
- public int getDeliveryCount()
- {
- return 0;
- }
-
- public void incrementDeliveryCount()
- {
- }
-
- public void decrementDeliveryCount()
- {
- }
- };
-
- if(action != null)
- {
- action.onEnqueue(queueEntry);
- }
-
- }
-
- boolean isInQueue(Message msg)
- {
- return messages.contains(msg);
- }
-
- }
-
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends AMQMessage
- {
- private static AtomicLong _messageId = new AtomicLong();
-
- private class TestIncomingMessage extends IncomingMessage
- {
-
- public TestIncomingMessage(final long messageId,
- final MessagePublishInfo info,
- final AMQProtocolSession publisher)
- {
- super(info);
- }
-
-
- public ContentHeaderBody getContentHeader()
- {
- return Message.this.getContentHeaderBody();
- }
- }
-
- private IncomingMessage _incoming;
-
-
- Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
- {
- this(protocolSession, id, getHeaders(headers));
- }
-
- Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
- {
- this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
- }
-
- public IncomingMessage getIncomingMessage()
- {
- return _incoming;
- }
-
- private Message(AMQProtocolSession protocolsession, long messageId,
- MessagePublishInfo publish,
- ContentHeaderBody header,
- List<ContentBody> bodies) throws AMQException
- {
- super(new MockStoredMessage(messageId, publish, header));
-
- StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
-
- int pos = 0;
- for(ContentBody body : bodies)
- {
- storedMessage.addContent(pos, ByteBuffer.wrap(body.getPayload()));
- pos += body.getPayload().length;
- }
-
- _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
- _incoming.setContentHeaderBody(header);
-
-
- }
-
-
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg.getStoredMessage());
- }
-
-
-
- void route(Exchange exchange) throws AMQException
- {
- _incoming.enqueue(exchange.route(_incoming));
- }
-
-
- public int hashCode()
- {
- return getKey().hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
- }
-
- private boolean equals(HeadersExchangeTest.Message m)
- {
- return getKey().equals(m.getKey());
- }
-
- public String toString()
- {
- return getKey().toString();
- }
-
- private Object getKey()
- {
- try
- {
- return getMessagePublishInfo().getRoutingKey();
- }
- catch (AMQException e)
- {
- _log.error("Error getting routing key: " + e, e);
- return null;
- }
- }
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 2ddb417d5d..7b7e2ec346 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -21,22 +21,32 @@
package org.apache.qpid.server.exchange;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class FanoutExchangeTest extends TestCase
{
@@ -51,7 +61,9 @@ public class FanoutExchangeTest extends TestCase
_virtualHost = mock(VirtualHost.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
- when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
_exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
}
@@ -76,14 +88,14 @@ public class FanoutExchangeTest extends TestCase
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
- _exchange.isBound((AMQShortString) null, (FieldTable) null, queue));
+ _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue));
}
public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
- _exchange.isBound((AMQShortString) null, queue));
+ _exchange.isBound(new AMQShortString("matters"), queue));
}
public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException
@@ -95,9 +107,86 @@ public class FanoutExchangeTest extends TestCase
private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException
{
+ AMQQueue queue = mockQueue();
+ _exchange.addBinding("matters", queue, null);
+ return queue;
+ }
+
+ private AMQQueue mockQueue()
+ {
AMQQueue queue = mock(AMQQueue.class);
when(queue.getVirtualHost()).thenReturn(_virtualHost);
- _exchange.addBinding("does not matter", queue, null);
return queue;
}
+
+ public void testRoutingWithSelectors() throws Exception
+ {
+ AMQQueue queue1 = mockQueue();
+ AMQQueue queue2 = mockQueue();
+
+ _exchange.addBinding("key",queue1, null);
+ _exchange.addBinding("key",queue2, null);
+
+
+ List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
+
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.removeBinding("key",queue2,null);
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ result = _exchange.route(mockMessage(false));
+
+ assertEquals("Expected message to be routed to queue1 only", 1, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
+
+
+ result = _exchange.route(mockMessage(false));
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ }
+
+ private InboundMessage mockMessage(boolean val)
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.containsHeader("select")).thenReturn(true);
+ when(header.getHeader("select")).thenReturn(val);
+ when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+ when(header.containsHeaders(anySet())).then(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Set names = (Set) invocation.getArguments()[0];
+ return names.size() == 1 && names.contains("select");
+
+ }
+ });
+ final InboundMessage inboundMessage = mock(InboundMessage.class);
+ when(inboundMessage.getMessageHeader()).thenReturn(header);
+ return inboundMessage;
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index bd6a02d69b..2b965358e0 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -20,106 +20,230 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.util.BrokerTestHelper;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import junit.framework.TestCase;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
-public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class HeadersExchangeTest extends TestCase
{
- private AMQProtocolSession _protocolSession;
+ private HeadersExchange _exchange;
+ private VirtualHost _virtualHost;
@Override
public void setUp() throws Exception
{
super.setUp();
- BrokerTestHelper.setUp();
- _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock());
+
+ CurrentActor.setDefault(mock(LogActor.class));
+ _exchange = new HeadersExchange();
+ _virtualHost = mock(VirtualHost.class);
+ SecurityManager securityManager = mock(SecurityManager.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
+ when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+
}
- @Override
- public void tearDown() throws Exception
+ protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception
{
- BrokerTestHelper.tearDown();
- super.tearDown();
+ List<? extends BaseQueue> results = _exchange.route(msg);
+ List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
+ unexpected.removeAll(Arrays.asList(expected));
+ assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
+ List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected));
+ missing.removeAll(results);
+ assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty());
+ assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size());
}
- public void testSimple() throws AMQException
+
+ private AMQQueue createAndBind(final String name, String... arguments)
+ throws Exception
{
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
- TestQueue q4 = bindDefault("F0001=Bear");
- TestQueue q5 = bindDefault("F0000", "F0001");
- TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
- TestQueue q7 = bindDefault("F0000", "F0001=Bear");
- TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"),
- q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
-
- Message m7 = new Message(_protocolSession, "Message7", "XXXXX");
-
- MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
- pb7.setMandatory(true);
- routeAndTest(m7,true);
-
- Message m8 = new Message(_protocolSession, "Message8", "F0000");
- MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
- pb8.setMandatory(true);
- routeAndTest(m8,false,q1);
+ return createAndBind(name, getArgsMapFromStrings(arguments));
+ }
+
+ private Map<String, Object> getArgsMapFromStrings(String... arguments)
+ {
+ Map<String, Object> map = new HashMap<String,Object>();
+
+ for(String arg : arguments)
+ {
+ if(arg.contains("="))
+ {
+ String[] keyValue = arg.split("=",2);
+ map.put(keyValue[0],keyValue[1]);
+ }
+ else
+ {
+ map.put(arg,null);
+ }
+ }
+ return map;
+ }
+ private AMQQueue createAndBind(final String name, Map<String, Object> arguments)
+ throws Exception
+ {
+ AMQQueue q = create(name);
+ bind(name, arguments, q);
+ return q;
+ }
+ private void bind(String bindingKey, Map<String, Object> arguments, AMQQueue q)
+ throws AMQSecurityException, AMQInternalException
+ {
+ _exchange.addBinding(bindingKey,q,arguments);
}
- public void testAny() throws AMQException
+ private AMQQueue create(String name)
{
- TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
- TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
- TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
- TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
- TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
+ AMQQueue q = mock(AMQQueue.class);
+ when(q.toString()).thenReturn(name);
+ when(q.getVirtualHost()).thenReturn(_virtualHost);
+ return q;
}
- public void testMandatory() throws AMQException
+
+ public void testSimple() throws Exception
{
- bindDefault("F0000");
- Message m1 = new Message(_protocolSession, "Message1", "XXXXX");
- Message m2 = new Message(_protocolSession, "Message2", "F0000");
- MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
- pb1.setMandatory(true);
- MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
- pb2.setMandatory(true);
- routeAndTest(m1,true);
+ AMQQueue q1 = createAndBind("Q1", "F0000");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+ AMQQueue q3 = createAndBind("Q3", "F0001");
+ AMQQueue q4 = createAndBind("Q4", "F0001=Bear");
+ AMQQueue q5 = createAndBind("Q5", "F0000", "F0001");
+ AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear");
+ AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear");
+ AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
+ q1, q2, q3, q4, q5, q6, q7, q8);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+
}
-
- public void testOnUnbind() throws AMQException
+
+ public void testAny() throws Exception
{
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3);
-
- unbind(q1,"F0000");
- routeAndTest(new Message(_protocolSession, "Message4", "F0000"));
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2);
+ AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any");
+ AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any");
+ AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any");
+ AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+ }
+
+ public void testOnUnbind() throws Exception
+ {
+ AMQQueue q1 = createAndBind("Q1", "F0000");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+ AMQQueue q3 = createAndBind("Q3", "F0001");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3);
+
+ _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000"));
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")));
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
+ }
+
+
+ public void testWithSelectors() throws Exception
+ {
+ AMQQueue q1 = create("Q1");
+ AMQQueue q2 = create("Q2");
+ bind("q1",getArgsMapFromStrings("F"), q1);
+ bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1);
+ bind("q2",getArgsMapFromStrings("F=1"), q2);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2);
+
+
+ AMQQueue q3 = create("Q3");
+ bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1);
+ bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3);
+
+ }
+
+ private InboundMessage mockMessage(final Map<String, Object> headerValues)
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.containsHeader(anyString())).then(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ return headerValues.containsKey((String) invocation.getArguments()[0]);
+ }
+ });
+ when(header.getHeader(anyString())).then(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ return headerValues.get((String) invocation.getArguments()[0]);
+ }
+ });
+ when(header.getHeaderNames()).thenReturn(headerValues.keySet());
+ when(header.containsHeaders(anySet())).then(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Set names = (Set) invocation.getArguments()[0];
+ return headerValues.keySet().containsAll(names);
+
+ }
+ });
+ final InboundMessage inboundMessage = mock(InboundMessage.class);
+ when(inboundMessage.getMessageHeader()).thenReturn(header);
+ return inboundMessage;
}
-
public static junit.framework.Test suite()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
index b9e9a33cd6..922cc1e2a7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
@@ -31,7 +31,7 @@ public class AMQHeadersExchange extends AMQDestination
{
public AMQHeadersExchange(BindingURL binding)
{
- this(binding.getExchangeName());
+ super(binding);
}
public AMQHeadersExchange(String name)
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index e784e903fa..018a1ec851 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -440,7 +440,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// If the session has been closed don't waste time creating a thread to do
// flow control
if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
- {
+ {
// Only execute change if previous state
// was False
if (!_suspendState.getAndSet(true))
@@ -535,7 +535,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract AMQException getLastException();
-
+
public void checkNotClosed() throws JMSException
{
try
@@ -553,7 +553,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ssnClosed.setLinkedException(ex);
ssnClosed.initCause(ex);
throw ssnClosed;
- }
+ }
else
{
throw ise;
@@ -987,13 +987,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method
return createDurableSubscriber(topic, name, null, false);
}
-
+
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
throws JMSException
{
checkNotClosed();
Topic origTopic = checkValidTopic(topic, true);
-
+
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
if (dest.getDestSyntax() == DestSyntax.ADDR &&
!dest.isAddressResolved())
@@ -1015,20 +1015,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throw toJMSException("Error when verifying destination", e);
}
}
-
+
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
-
+
_subscriberDetails.lock();
try
{
TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
-
+
// Not subscribed to this name in the current session
if (subscriber == null)
{
// After the address is resolved routing key will not be null.
AMQShortString topicName = dest.getRoutingKey();
-
+
if (_strictAMQP)
{
if (_strictAMQPFATAL)
@@ -1046,8 +1046,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
else
{
Map<String,Object> args = new HashMap<String,Object>();
-
- // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
// durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
@@ -1060,16 +1060,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
- boolean isQueueBoundForTopicAndSelector =
+ boolean isQueueBoundForTopicAndSelector =
isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
if (isQueueBound && !isQueueBoundForTopicAndSelector)
{
deleteQueue(dest.getAMQQueueName());
}
+ else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match
+ {
+ try
+ {
+ bindQueue(dest.getAMQQueueName(), dest.getRoutingKey(),
+ FieldTable.convertToFieldTable(args), dest.getExchangeName(), dest, true);
+ }
+ catch(AMQException e)
+ {
+ throw toJMSException("Error when checking binding",e);
+ }
+ }
}
}
- else
+ else
{
// Subscribed with the same topic and no current / previous or same selector
if (subscriber.getTopic().equals(topic)
@@ -1100,7 +1112,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_subscriberAccess.unlock();
}
-
+
return subscriber;
}
catch (TransportException e)
@@ -1193,19 +1205,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (syntax == AMQDestination.DestSyntax.BURL)
{
// For testing we may want to use the prefix
- return new AMQQueue(getDefaultQueueExchangeName(),
+ return new AMQQueue(getDefaultQueueExchangeName(),
new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
}
else
{
AMQQueue queue = new AMQQueue(queueName);
return queue;
-
+
}
}
else
{
- return new AMQQueue(queueName);
+ return new AMQQueue(queueName);
}
}
catch (URISyntaxException urlse)
@@ -1341,7 +1353,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return new QueueReceiverAdaptor(dest, consumer);
}
-
+
private Queue validateQueue(Destination dest) throws InvalidDestinationException
{
if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
@@ -1497,9 +1509,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- return new AMQTopic(topicName);
+ return new AMQTopic(topicName);
}
-
+
}
catch (URISyntaxException urlse)
{
@@ -1646,16 +1658,24 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug("Message[" + message.toString() + "] received in session");
}
_highestDeliveryTag.set(message.getDeliveryTag());
- _queue.add(message);
+ _queue.add(message);
}
public void declareAndBind(AMQDestination amqd)
throws
AMQException
{
+ declareAndBind(amqd, new FieldTable());
+ }
+
+
+ public void declareAndBind(AMQDestination amqd, FieldTable arguments)
+ throws
+ AMQException
+ {
declareExchange(amqd, false);
AMQShortString queueName = declareQueue(amqd, false);
- bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd);
}
/**
@@ -1681,7 +1701,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Not that this does not necessarily mean that the recovery has failed, but simply that it is
* not possible to tell if it has or not.
* @todo Be aware of possible changes to parameter order as versions change.
- *
+ *
* Strategy for handling recover.
* Flush any acks not yet sent.
* Stop the message flow.
@@ -1730,7 +1750,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
sendRecover();
-
+
markClean();
if (!isSuspended)
@@ -1755,7 +1775,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected abstract void sendRecover() throws AMQException, FailoverException;
protected abstract void flushAcknowledgments();
-
+
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -1851,7 +1871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setMessageListener(MessageListener listener) throws JMSException
{
}
-
+
/**
* @see #unsubscribe(String, boolean)
*/
@@ -1866,20 +1886,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
}
}
-
+
/**
* Unsubscribe from a subscription.
- *
+ *
* @param name the name of the subscription to unsubscribe
* @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the
* queue is not bound, possibly due to the subscription being closed.
- * @throws JMSException on
+ * @throws JMSException on
* @throws InvalidDestinationException
*/
private void unsubscribe(String name, boolean safe) throws JMSException
{
TopicSubscriberAdaptor<C> subscriber;
-
+
_subscriberDetails.lock();
try
{
@@ -1896,11 +1916,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_subscriberDetails.unlock();
}
-
+
if (subscriber != null)
{
subscriber.close();
-
+
// send a queue.delete for the subscription
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
@@ -1917,7 +1937,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
+ " Requesting queue deletion regardless.");
}
-
+
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
else // Queue Browser
@@ -1936,8 +1956,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose) throws JMSException
+ final int prefetchLow, final boolean noLocal,
+ final boolean exclusive, String selector, final FieldTable rawSelector,
+ final boolean noConsume, final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -2111,7 +2132,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throws JMSException;
public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
-
+
public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException;
/**
@@ -2844,14 +2865,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
declareExchange(amqd, nowait);
}
-
+
if (_delareQueues || amqd.isNameRequired())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
- bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
+ if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
+ {
+ bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(),
+ amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait);
+ }
+
}
-
+
AMQShortString queueName = amqd.getAMQQueueName();
// store the consumer queue name
@@ -2895,10 +2921,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
+ throws AMQException;
+
public abstract void resolveAddress(AMQDestination dest,
boolean isConsumer,
boolean noLocal) throws AMQException;
-
+
private void registerProducer(long producerId, MessageProducer producer)
{
_producers.put(new Long(producerId), producer);
@@ -3189,7 +3218,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
-
+
public void run()
{
if (_dispatcherLogger.isDebugEnabled())
@@ -3304,7 +3333,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (updateRollbackMark(current, deliveryTag))
{
_rollbackMark.compareAndSet(current, deliveryTag);
- }
+ }
}
private void notifyConsumer(UnprocessedMessage message)
@@ -3424,7 +3453,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return super.isClosing() || _connection.isClosing();
}
-
+
public boolean isDeclareExchanges()
{
return _declareExchanges;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index e2cfe0e27f..1baaff738b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -143,7 +143,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
private RangeSet unacked = RangeSetFactory.createRangeSet();
- private int unackedCount = 0;
+ private int unackedCount = 0;
/**
* Used to store the range of in tx messages
@@ -292,7 +292,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
flushAcknowledgments(false);
}
-
+
void flushAcknowledgments(boolean setSyncBit)
{
synchronized (unacked)
@@ -310,7 +310,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
messageAcknowledge(ranges,accept,false);
}
-
+
void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
final Session ssn = getQpidSession();
@@ -354,15 +354,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (destination.getDestSyntax() == DestSyntax.BURL)
{
Map args = FieldTableSupport.convertToMap(arguments);
-
+
for (AMQShortString rk: destination.getBindingKeys())
{
- _logger.debug("Binding queue : " + queueName.toString() +
- " exchange: " + exchangeName.toString() +
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + exchangeName.toString() +
" using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(),
- exchangeName.toString(),
- rk.toString(),
+ getQpidSession().exchangeBind(queueName.toString(),
+ exchangeName.toString(),
+ rk.toString(),
args);
}
}
@@ -371,10 +371,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
List<Binding> bindings = new ArrayList<Binding>();
bindings.addAll(destination.getNode().getBindings());
-
+
String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
destination.getAddressName(): "amq.topic";
-
+
for (Binding binding: bindings)
{
// Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
@@ -386,22 +386,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
String queue = binding.getQueue() == null?
queueName.asString(): binding.getQueue();
-
- String exchange = binding.getExchange() == null ?
+
+ String exchange = binding.getExchange() == null ?
defaultExchange :
binding.getExchange();
-
- _logger.debug("Binding queue : " + queue +
- " exchange: " + exchange +
- " using binding key " + binding.getBindingKey() +
+
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queue,
+ getQpidSession().exchangeBind(queue,
exchange,
binding.getBindingKey(),
- binding.getArgs());
+ binding.getArgs());
}
}
-
+
if (!nowait)
{
// We need to sync so that we get notify of an error.
@@ -561,20 +561,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
- throws JMSException
{
return isQueueBound(exchangeName,queueName,routingKey,null);
}
- public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ public boolean isQueueBound(final AMQDestination destination)
{
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
}
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
- throws JMSException
{
- String rk = null;
+ String rk = null;
if (bindingKeys != null && bindingKeys.length>0)
{
rk = bindingKeys[0].toString();
@@ -583,10 +581,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
rk = routingKey.toString();
}
-
+
return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
}
-
+
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
{
boolean res;
@@ -598,21 +596,27 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
else
- {
+ {
if (args == null)
{
- res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
.getQueueNotMatched());
}
else
{
- res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
.getQueueNotMatched() || bindingQueryResult.getArgsNotMatched());
}
}
return res;
}
+ @Override
+ protected boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
+ {
+ return isQueueBound(exchangeName, amqQueueName, routingKey);
+ }
+
/**
* This method is invoked when a consumer is created
* Registers the consumer with the broker
@@ -730,7 +734,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
/**
- * deletes an exchange
+ * deletes an exchange
*/
public void sendExchangeDelete(final String name, final boolean nowait)
throws AMQException, FailoverException
@@ -763,12 +767,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
if (amqd.getDestSyntax() == DestSyntax.BURL)
- {
+ {
Map<String,Object> arguments = new HashMap<String,Object>();
if (noLocal)
- {
+ {
arguments.put(AddressHelper.NO_LOCAL, true);
- }
+ }
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
@@ -790,7 +794,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
arguments,
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
- node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
// passive --> false
@@ -837,7 +841,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
long capacity = consumer.getCapacity();
-
+
if (capacity == 0)
{
if (consumer.getMessageListener() != null)
@@ -1090,20 +1094,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return AMQMessageDelegateFactory.FACTORY_0_10;
}
-
+
public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
- match = !result.getNotFound();
+ match = !result.getNotFound();
Node node = dest.getNode();
-
+
if (match)
{
if (assertNode)
{
- match = (result.getDurable() == node.isDurable()) &&
- (node.getExchangeType() != null &&
+ match = (result.getDurable() == node.isDurable()) &&
+ (node.getExchangeType() != null &&
node.getExchangeType().equals(result.getType())) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
}
@@ -1125,7 +1129,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
-
+
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
boolean match = true;
@@ -1137,7 +1141,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (match && assertNode)
{
- match = (result.getDurable() == node.isDurable()) &&
+ match = (result.getDurable() == node.isDurable()) &&
(result.getAutoDelete() == node.isAutoDelete()) &&
(result.getExclusive() == node.isExclusive()) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
@@ -1165,17 +1169,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
return match;
}
-
+
private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
{
boolean match = true;
for (String key: source.keySet())
{
- match = target.containsKey(key) &&
+ match = target.containsKey(key) &&
target.get(key).equals(source.get(key));
-
- if (!match)
- {
+
+ if (!match)
+ {
StringBuffer buf = new StringBuffer();
buf.append("Property given in address did not match with the args sent by the broker.");
buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
@@ -1184,22 +1188,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
}
-
+
return match;
}
/**
* 1. Try to resolve the address type (queue or exchange)
- * 2. if type == queue,
+ * 2. if type == queue,
* 2.1 verify queue exists or create if create == true
* 2.2 If not throw exception
- *
+ *
* 3. if type == exchange,
* 3.1 verify exchange exists or create if create == true
* 3.2 if not throw exception
* 3.3 if exchange exists (or created) create subscription queue.
*/
-
+
@SuppressWarnings("deprecation")
public void resolveAddress(AMQDestination dest,
boolean isConsumer,
@@ -1211,21 +1215,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
+ boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
(isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
(!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
+
boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
(isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
(!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
+
+
+
int type = resolveAddressType(dest);
-
+
switch (type)
{
- case AMQDestination.QUEUE_TYPE:
+ case AMQDestination.QUEUE_TYPE:
{
if(createNode)
{
@@ -1239,24 +1243,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
break;
}
}
-
- case AMQDestination.TOPIC_TYPE:
+
+ case AMQDestination.TOPIC_TYPE:
{
if(createNode)
- {
+ {
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
handleExchangeNodeCreation(dest);
break;
}
else if (isExchangeExist(dest,assertNode))
- {
+ {
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
break;
}
}
-
+
default:
throw new AMQException(
"The name '" + dest.getAddressName() +
@@ -1265,7 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setAddressResolved(System.currentTimeMillis());
}
}
-
+
public int resolveAddressType(AMQDestination dest) throws AMQException
{
int type = dest.getAddressType();
@@ -1292,14 +1296,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
dest.setAddressType(type);
return type;
- }
+ }
}
-
+
private void verifySubject(AMQDestination dest) throws AMQException
{
if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
{
-
+
if ("topic".equals(dest.getExchangeClass().toString()))
{
dest.setRoutingKey(new AMQShortString("#"));
@@ -1364,12 +1368,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// legacy support
dest.setExchangeName(new AMQShortString(dest.getAddressName()));
Node node = dest.getNode();
- dest.setExchangeClass(node.getExchangeType() == null?
+ dest.setExchangeClass(node.getExchangeType() == null?
ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
- new AMQShortString(node.getExchangeType()));
+ new AMQShortString(node.getExchangeType()));
dest.setRoutingKey(new AMQShortString(dest.getSubject()));
}
-
+
protected void acknowledgeImpl()
{
RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
@@ -1412,7 +1416,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
getPrefetchedMessageTags().addAll(tags);
}
-
+
RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 3097b33da3..9a9da62f2a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -184,7 +184,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// thread.
// We can't close the session if we are already in the process of
// closing/closed the connection.
-
+
if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
|| getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING)))
{
@@ -381,10 +381,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
public AMQMethodEvent execute() throws AMQException, FailoverException
{
- AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
- (exchangeName, routingKey, queueName).generateFrame(getChannelId());
-
- return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ return sendExchangeBound(exchangeName, routingKey, queueName);
}
}, getAMQConnection()).execute();
@@ -398,7 +395,38 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
- }
+ }
+
+ @Override
+ protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws AMQException
+ {
+
+ AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ return sendExchangeBound(exchangeName, routingKey, queueName);
+
+ }
+ }, getAMQConnection()).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ return (responseBody.getReplyCode() == 0);
+ }
+
+ private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName,
+ AMQShortString routingKey,
+ AMQShortString queueName) throws AMQException, FailoverException
+ {
+ AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
+ (exchangeName, routingKey, queueName).generateFrame(getChannelId());
+
+ return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ }
@Override
public void sendConsume(BasicMessageConsumer_0_8 consumer,
@@ -527,7 +555,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
JMSException ex = new JMSException("Error creating producer");
ex.initCause(e);
ex.setLinkedException(e);
-
+
throw ex;
}
}
@@ -609,7 +637,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
-
+
return null;
}
}, getAMQConnection()).execute();
@@ -671,7 +699,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
false,
null).generateFrame(getChannelId());
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
return okHandler.getMessageCount();
}
@@ -689,9 +717,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
return AMQMessageDelegateFactory.FACTORY_0_8;
}
-
+
public void sync() throws AMQException
- {
+ {
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
@@ -702,10 +730,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
throw new UnsupportedOperationException("The new addressing based syntax is "
+ "not supported for AMQP 0-8/0-9 versions");
}
-
+
protected void flushAcknowledgments()
{
-
+
}
@Override
@@ -744,7 +772,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
AMQStateManager manager = getProtocolHandler().getStateManager();
-
+
Exception e = manager.getLastException();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
&& e != null)
@@ -752,15 +780,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if (e instanceof AMQException)
{
return (AMQException) e;
- }
+ }
else
{
AMQException amqe = new AMQException(AMQConstant
- .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
+ .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
e.getMessage(), e.getCause());
return amqe;
}
- }
+ }
else
{
return null;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 96cd209447..d78e725a5d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -42,7 +42,7 @@ public class AMQTopic extends AMQDestination implements Topic
{
super(address);
}
-
+
protected AMQTopic()
{
super();
@@ -89,6 +89,12 @@ public class AMQTopic extends AMQDestination implements Topic
super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable);
}
+
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ {
+ super(exchangeName, exchangeClass, name, true, isAutoDelete, queueName, isDurable);
+ }
+
protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
@@ -114,10 +120,10 @@ public class AMQTopic extends AMQDestination implements Topic
AMQTopic t = new AMQTopic(qpidTopic.getAddress());
AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
// link is never null if dest was created using an address string.
- t.getLink().setName(queueName.asString());
+ t.getLink().setName(queueName.asString());
t.getLink().getSubscriptionQueue().setAutoDelete(false);
t.getLink().setDurable(true);
-
+
// The legacy fields are also populated just in case.
t.setQueueName(queueName);
t.setAutoDelete(false);
@@ -134,7 +140,7 @@ public class AMQTopic extends AMQDestination implements Topic
}
else
{
- return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
+ return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getExchangeClass(), qpidTopic.getRoutingKey(), false,
getDurableTopicQueueName(subscriptionName, connection),
true);
}
@@ -165,7 +171,7 @@ public class AMQTopic extends AMQDestination implements Topic
return null;
}
}
-
+
@Override
public AMQShortString getExchangeName()
{
@@ -181,9 +187,9 @@ public class AMQTopic extends AMQDestination implements Topic
public AMQShortString getRoutingKey()
{
- if (super.getRoutingKey() != null)
+ if (super.getRoutingKey() != null)
{
- return super.getRoutingKey();
+ return super.getRoutingKey();
}
else if (getSubject() != null)
{
diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
index 57cd2a1ff5..f50e65214c 100644
--- a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
+++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
@@ -40,7 +40,6 @@ public enum AMQPFilterTypes
/** The identifying string for the filter type. */
private final AMQShortString _value;
-
/**
* Creates a new filter type from its identifying string.
*
@@ -60,4 +59,10 @@ public enum AMQPFilterTypes
{
return _value;
}
+
+ @Override
+ public String toString()
+ {
+ return _value.asString();
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index 3d116f1b1b..91f56f369b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -83,9 +83,12 @@ public class ReturnUnroutableMandatoryMessageTest extends QpidBrokerTestCase imp
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
+
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
- consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft);
+ consumerSession.declareAndBind(queue, ft);
+
+ consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
//((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
index dfd26b474a..646c17d1f2 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
@@ -111,7 +111,7 @@ public class BindingLoggingTest extends AbstractTestLogging
String messageID = "BND-1001";
String queueName = _queue.getQueueName();
String exchange = "direct/amq.direct";
- String message = "Create : Arguments : {x-filter-jms-selector=}";
+ String message = "Create";
validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName);
}
@@ -145,7 +145,7 @@ public class BindingLoggingTest extends AbstractTestLogging
// Perform full testing on the binding
String message = getMessageString(fromMessage(getLogMessage(results, 0)));
-
+
validateLogMessage(getLogMessage(results, 0), messageID, message,
"topic/amq.topic", "topic", "clientid:" + getName());
@@ -208,17 +208,17 @@ public class BindingLoggingTest extends AbstractTestLogging
validateMessageID(messageID, log);
String subject = fromSubject(log);
-
+
validateBindingDeleteArguments(subject, "/test");
assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log)));
}
-
+
private void validateBindingDeleteArguments(String subject, String vhostName)
{
String routingKey = AbstractTestLogSubject.getSlice("rk", subject);
-
+
assertTrue("Routing Key does not start with TempQueue:"+routingKey,
routingKey.startsWith("TempQueue"));
assertEquals("Virtualhost not correct.", vhostName,
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 3783b0bd02..67a2988ad1 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.qpid.test.client.destination;
@@ -49,7 +49,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
private Connection _connection;
-
+
@Override
public void setUp() throws Exception
{
@@ -57,20 +57,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
_connection = getConnection() ;
_connection.start();
}
-
+
@Override
public void tearDown() throws Exception
{
_connection.close();
super.tearDown();
}
-
+
public void testCreateOptions() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer prod;
MessageConsumer cons;
-
+
// default (create never, assert never) -------------------
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1";
@@ -84,7 +84,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
try
{
prod = jmsSession.createProducer(dest);
@@ -94,22 +94,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest,false));
-
-
+
+
// create always -------------------------------------------
addr1 = "ADDR:testQueue1; { create: always }";
dest = new AMQAnyDestination(addr1);
- cons = jmsSession.createConsumer(dest);
-
+ cons = jmsSession.createConsumer(dest);
+
assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
-
+
// create receiver -----------------------------------------
addr1 = "ADDR:testQueue2; { create: receiver }";
dest = new AMQAnyDestination(addr1);
@@ -122,32 +122,32 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, false));
-
-
- cons = jmsSession.createConsumer(dest);
-
+
+
+ cons = jmsSession.createConsumer(dest);
+
assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
-
+
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
dest = new AMQAnyDestination(addr1);
try
{
- cons = jmsSession.createConsumer(dest);
+ cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
try
{
prod = jmsSession.createProducer(dest);
@@ -157,17 +157,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, false));
-
+
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
dest = new AMQAnyDestination(addr1);
-
+
try
{
- cons = jmsSession.createConsumer(dest);
+ cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
@@ -176,84 +176,84 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, false));
-
+
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
-
+
}
-
+
public void testCreateQueue() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String addr = "ADDR:my-queue/hello; " +
- "{" +
+ "{" +
"create: always, " +
- "node: " +
- "{" +
+ "node: " +
+ "{" +
"durable: true ," +
"x-declare: " +
- "{" +
+ "{" +
"exclusive: true," +
- "arguments: {" +
+ "arguments: {" +
"'qpid.max_size': 1000," +
"'qpid.max_count': 100" +
- "}" +
- "}, " +
- "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+ "}" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', key : test}, " +
"{exchange : 'amq.fanout'}," +
"{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
"{exchange : 'amq.topic', key : 'a.#'}" +
- "]," +
-
+ "]," +
+
"}" +
"}";
AMQDestination dest = new AMQAnyDestination(addr);
- MessageConsumer cons = jmsSession.createConsumer(dest);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
cons.close();
-
+
// Even if the consumer is closed the queue and the bindings should be intact.
-
+
assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
dest.getAddressName(),"test", null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
dest.getAddressName(),null, null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
- dest.getAddressName(),"a.#", null));
-
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ dest.getAddressName(),"a.#", null));
+
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-match","any");
args.put("dep","sales");
args.put("loc","CA");
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, args));
-
+
MessageProducer prod = jmsSession.createProducer(dest);
prod.send(jmsSession.createTextMessage("test"));
-
+
MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue"));
Message m = cons2.receive(1000);
assertNotNull("Should receive message sent to my-queue",m);
assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT));
}
-
+
public void testCreateExchange() throws Exception
{
createExchangeImpl(false, false);
@@ -283,21 +283,21 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- String addr = "ADDR:my-exchange/hello; " +
- "{ " +
- "create: always, " +
- "node: " +
+ String addr = "ADDR:my-exchange/hello; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
"{" +
"type: topic, " +
"x-declare: " +
- "{ " +
- "type:direct, " +
+ "{ " +
+ "type:direct, " +
"auto-delete: true" +
createExchangeArgsString(withExchangeArgs, useNonsenseArguments) +
"}" +
"}" +
"}";
-
+
AMQDestination dest = new AMQAnyDestination(addr);
MessageConsumer cons;
@@ -322,20 +322,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
fail("Unexpected exception whilst creating consumer: " + e);
}
}
-
+
assertTrue("Exchange not created as expected",(
(AMQSession_0_10)jmsSession).isExchangeExist(dest,true));
-
+
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
- dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
-
+ (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
+ dest.getQueueName(),"hello", null));
+
// The client should be able to query and verify the existence of my-exchange (QPID-2774)
dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}");
- cons = jmsSession.createConsumer(dest);
+ cons = jmsSession.createConsumer(dest);
}
-
+
private String createExchangeArgsString(final boolean withExchangeArgs,
final boolean useNonsenseArguments)
{
@@ -366,60 +366,60 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
- dest.getAddressName(),"test", null));
-
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ dest.getAddressName(),"test", null));
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
dest.getAddressName(),"a.#", null));
-
+
Address a = Address.parse(headersBinding);
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, a.getOptions()));
}
-
+
/**
* Test goal: Verifies that a producer and consumer creation triggers the correct
* behavior for x-bindings specified in node props.
*/
public void testBindQueueWithArgs() throws Exception
{
-
+
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
-
- String addr = "node: " +
- "{" +
+
+ String addr = "node: " +
+ "{" +
"durable: true ," +
- "x-declare: " +
- "{ " +
+ "x-declare: " +
+ "{ " +
"auto-delete: true," +
"arguments: {'qpid.max_count': 100}" +
"}, " +
"x-bindings: [{exchange : 'amq.direct', key : test}, " +
- "{exchange : 'amq.topic', key : 'a.#'}," +
- headersBinding +
+ "{exchange : 'amq.topic', key : 'a.#'}," +
+ headersBinding +
"]" +
"}" +
"}";
-
+
AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr);
- MessageConsumer cons = jmsSession.createConsumer(dest1);
- checkQueueForBindings(jmsSession,dest1,headersBinding);
-
+ MessageConsumer cons = jmsSession.createConsumer(dest1);
+ checkQueueForBindings(jmsSession,dest1,headersBinding);
+
AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr);
- MessageProducer prod = jmsSession.createProducer(dest2);
- checkQueueForBindings(jmsSession,dest2,headersBinding);
+ MessageProducer prod = jmsSession.createProducer(dest2);
+ checkQueueForBindings(jmsSession,dest2,headersBinding);
}
-
+
/**
* Test goal: Verifies the capacity property in address string is handled properly.
* Test strategy:
@@ -427,22 +427,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
* Creates consumer with client ack.
* Sends 15 messages to the queue, tries to receive 10.
* Tries to receive the 11th message and checks if its null.
- *
- * Since capacity is 10 and we haven't acked any messages,
+ *
+ * Since capacity is 10 and we haven't acked any messages,
* we should not have received the 11th.
- *
+ *
* Acks the 10th message and verifies we receive the rest of the msgs.
*/
public void testCapacity() throws Exception
{
verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}");
}
-
+
public void testSourceAndTargetCapacity() throws Exception
{
verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}");
}
-
+
private void verifyCapacity(String address) throws Exception
{
if (!isCppBroker())
@@ -450,13 +450,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
_logger.info("Not C++ broker, exiting test");
return;
}
-
+
Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-
+
AMQDestination dest = new AMQAnyDestination(address);
- MessageConsumer cons = jmsSession.createConsumer(dest);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
MessageProducer prod = jmsSession.createProducer(dest);
-
+
for (int i=0; i< 15; i++)
{
prod.send(jmsSession.createTextMessage("msg" + i) );
@@ -475,48 +475,48 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
}
}
-
+
/**
* Test goal: Verifies if the new address format based destinations
* can be specified and loaded correctly from the properties file.
- *
+ *
*/
public void testLoadingFromPropertiesFile() throws Exception
{
- Hashtable<String,String> map = new Hashtable<String,String>();
- map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " +
+ Hashtable<String,String> map = new Hashtable<String,String>();
+ map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " +
"{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}");
-
+
map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }");
map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'");
-
+
PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory();
Context ctx = props.getInitialContext(map);
-
- AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");
+
+ AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");
AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2");
AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3");
-
+
Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons1 = jmsSession.createConsumer(dest1);
+ MessageConsumer cons1 = jmsSession.createConsumer(dest1);
MessageConsumer cons2 = jmsSession.createConsumer(dest2);
MessageConsumer cons3 = jmsSession.createConsumer(dest3);
-
+
assertTrue("Destination1 was not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest1, true));
-
+
assertTrue("Destination1 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest1.getAddressName(),dest1.getAddressName(), null));
-
+
assertTrue("Destination2 was not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest2,true));
-
+
assertTrue("Destination2 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest2.getAddressName(),dest2.getAddressName(), null));
-
+
MessageProducer producer = jmsSession.createProducer(dest3);
producer.send(jmsSession.createTextMessage("Hello"));
TextMessage msg = (TextMessage)cons3.receive(1000);
@@ -527,31 +527,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
* Test goal: Verifies the subject can be overridden using "qpid.subject" message property.
* Test strategy: Creates and address with a default subject "topic1"
* Creates a message with "qpid.subject"="topic2" and sends it.
- * Verifies that the message goes to "topic2" instead of "topic1".
+ * Verifies that the message goes to "topic2" instead of "topic1".
*/
public void testOverridingSubject() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-
+
AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}");
-
+
MessageProducer prod = jmsSession.createProducer(topic1);
-
+
Message m = jmsSession.createTextMessage("Hello");
m.setStringProperty("qpid.subject", "topic2");
-
+
MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1);
MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}"));
-
+
prod.send(m);
Message msg = consForTopic1.receive(1000);
assertNull("message shouldn't have been sent to topic1",msg);
-
+
msg = consForTopic2.receive(1000);
- assertNotNull("message should have been sent to topic2",msg);
-
+ assertNotNull("message should have been sent to topic2",msg);
+
}
-
+
/**
* Test goal: Verifies that session.createQueue method
* works as expected both with the new and old addressing scheme.
@@ -559,19 +559,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testSessionCreateQueue() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
// Using the BURL method
Destination queue = ssn.createQueue("my-queue");
- MessageProducer prod = ssn.createProducer(queue);
+ MessageProducer prod = ssn.createProducer(queue);
MessageConsumer cons = ssn.createConsumer(queue);
assertTrue("my-queue was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
"my-queue","my-queue", null));
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
// Using the ADDR method
// default case
queue = ssn.createQueue("ADDR:my-queue2");
@@ -586,34 +586,34 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"doesn't resolve to an exchange or a queue";
assertEquals(s,e.getCause().getCause().getMessage());
}
-
+
// explicit create case
queue = ssn.createQueue("ADDR:my-queue2; {create: sender}");
- prod = ssn.createProducer(queue);
+ prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
assertTrue("my-queue2 was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("",
+ (AMQSession_0_10)ssn).isQueueBound("",
"my-queue2","my-queue2", null));
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
// Using the ADDR method to create a more complicated queue
String addr = "ADDR:amq.direct/x512; {" +
- "link : {name : 'MY.RESP.QUEUE', " +
+ "link : {name : 'MY.RESP.QUEUE', " +
"x-declare : { auto-delete: true, exclusive: true, " +
"arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
queue = ssn.createQueue(addr);
-
+
cons = ssn.createConsumer(queue);
prod = ssn.createProducer(queue);
assertTrue("MY.RESP.QUEUE was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
"MY.RESP.QUEUE","x512", null));
cons.close();
}
-
+
/**
* Test goal: Verifies that session.creatTopic method works as expected
* both with the new and old addressing scheme.
@@ -635,68 +635,68 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
// Using the BURL method
Topic topic = ssn.createTopic("ACME");
- MessageProducer prod = ssn.createProducer(topic);
+ MessageProducer prod = ssn.createProducer(topic);
MessageConsumer cons = ssn.createConsumer(topic);
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
// Using the ADDR method
topic = ssn.createTopic("ADDR:ACME");
- prod = ssn.createProducer(topic);
+ prod = ssn.createProducer(topic);
cons = ssn.createConsumer(topic);
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
- String addr = "ADDR:vehicles/bus; " +
- "{ " +
- "create: always, " +
- "node: " +
+ String addr = "ADDR:vehicles/bus; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
"{" +
"type: topic, " +
"x-declare: " +
- "{ " +
- "type:direct, " +
+ "{ " +
+ "type:direct, " +
"auto-delete: true" +
createExchangeArgsString(withExchangeArgs, false) +
"}" +
"}, " +
"link: {name : my-topic, " +
"x-bindings: [{exchange : 'vehicles', key : car}, " +
- "{exchange : 'vehicles', key : van}]" +
- "}" +
+ "{exchange : 'vehicles', key : van}]" +
+ "}" +
"}";
-
+
// Using the ADDR method to create a more complicated topic
topic = ssn.createTopic(addr);
cons = ssn.createConsumer(topic);
prod = ssn.createProducer(topic);
-
+
assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
"my-topic","bus", null));
-
+
assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
"my-topic","car", null));
-
+
assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
"my-topic","van", null));
-
+
Message msg = ssn.createTextMessage("test");
msg.setStringProperty("qpid.subject", "van");
prod.send(msg);
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
}
-
+
/**
* Test Goal : Verify the default subjects used for each exchange type.
* The default for amq.topic is "#" and for the rest it's ""
@@ -704,92 +704,92 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testDefaultSubjects() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct"));
MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic"));
-
+
MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct"));
MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather"));
MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales"));
-
+
queueProducer.send(ssn.createBytesMessage());
assertNotNull("The consumer subscribed to amq.direct " +
"with empty binding key should have received the message ",queueCons.receive(1000));
-
+
topicProducer1.send(ssn.createTextMessage("25c"));
assertEquals("The consumer subscribed to amq.topic " +
"with '#' binding key should have received the message ",
((TextMessage)topicCons.receive(1000)).getText(),"25c");
-
+
topicProducer2.send(ssn.createTextMessage("1000"));
assertEquals("The consumer subscribed to amq.topic " +
"with '#' binding key should have received the message ",
((TextMessage)topicCons.receive(1000)).getText(),"1000");
}
-
+
/**
* Test Goal : Verify that 'mode : browse' works as expected using a regular consumer.
* This indirectly tests ring queues as well.
*/
public void testBrowseMode() throws Exception
{
-
+
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " +
"node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
"x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}";
-
+
Destination dest = ssn.createQueue(addr);
MessageConsumer browseCons = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
-
+
prod.send(ssn.createTextMessage("Test1"));
prod.send(ssn.createTextMessage("Test2"));
-
+
TextMessage msg = (TextMessage)browseCons.receive(1000);
assertEquals("Didn't receive the first message",msg.getText(),"Test1");
-
+
msg = (TextMessage)browseCons.receive(1000);
assertEquals("Didn't receive the first message",msg.getText(),"Test2");
-
- browseCons.close();
+
+ browseCons.close();
prod.send(ssn.createTextMessage("Test3"));
browseCons = ssn.createConsumer(dest);
-
+
msg = (TextMessage)browseCons.receive(1000);
assertEquals("Should receive the second message again",msg.getText(),"Test2");
-
+
msg = (TextMessage)browseCons.receive(1000);
assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3");
-
+
assertNull("Should not receive anymore messages",browseCons.receive(500));
}
-
+
/**
* Test Goal : When the same destination is used when creating two consumers,
- * If the type == topic, verify that unique subscription queues are created,
+ * If the type == topic, verify that unique subscription queues are created,
* unless subscription queue has a name.
- *
+ *
* If the type == queue, same queue should be shared.
*/
public void testSubscriptionForSameDestination() throws Exception
{
- Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
MessageConsumer consumer1 = ssn.createConsumer(dest);
MessageConsumer consumer2 = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(dest);
-
+
prod.send(ssn.createTextMessage("A"));
TextMessage m = (TextMessage)consumer1.receive(1000);
assertEquals("Consumer1 should recieve message A",m.getText(),"A");
m = (TextMessage)consumer2.receive(1000);
assertEquals("Consumer2 should recieve message A",m.getText(),"A");
-
+
consumer1.close();
consumer2.close();
-
+
dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}");
consumer1 = ssn.createConsumer(dest);
try
@@ -798,61 +798,61 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
}
catch(Exception e)
- {
+ {
}
_connection.close();
-
+
_connection = getConnection() ;
_connection.start();
- ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
dest = ssn.createTopic("ADDR:my_queue; {create: always}");
consumer1 = ssn.createConsumer(dest);
consumer2 = ssn.createConsumer(dest);
prod = ssn.createProducer(dest);
-
+
prod.send(ssn.createTextMessage("A"));
- Message m1 = consumer1.receive(1000);
+ Message m1 = consumer1.receive(1000);
Message m2 = consumer2.receive(1000);
-
+
if (m1 != null)
{
- assertNull("Only one consumer should receive the message",m2);
+ assertNull("Only one consumer should receive the message",m2);
}
else
{
- assertNotNull("Only one consumer should receive the message",m2);
+ assertNotNull("Only one consumer should receive the message",m2);
}
}
-
+
public void testXBindingsWithoutExchangeName() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
String addr = "ADDR:MRKT; " +
"{" +
- "create: receiver," +
+ "create: receiver," +
"node : {type: topic, x-declare: {type: topic} }," +
"link:{" +
"name: my-topic," +
"x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" +
"}" +
"}";
-
+
// Using the ADDR method to create a more complicated topic
Topic topic = ssn.createTopic(addr);
MessageConsumer cons = ssn.createConsumer(topic);
-
+
assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","NYSE.#", null));
-
+
assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","NASDAQ.#", null));
-
+
assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","CNTL.#", null));
-
+
MessageProducer prod = ssn.createProducer(topic);
Message msg = ssn.createTextMessage("test");
msg.setStringProperty("qpid.subject", "NASDAQ.ABCD");
@@ -860,7 +860,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
}
-
+
public void testXSubscribeOverrides() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -873,41 +873,41 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
}
catch(Exception e)
- {
+ {
}
}
-
+
public void testQueueReceiversAndTopicSubscriber() throws Exception
{
Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
-
+
QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = qSession.createReceiver(queue);
-
+
TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = tSession.createSubscriber(topic);
-
+
Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
prod1.send(ssn.createTextMessage("test1"));
-
+
MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
prod2.send(ssn.createTextMessage("test2"));
-
+
Message msg1 = receiver.receive();
assertNotNull(msg1);
assertEquals("test1",((TextMessage)msg1).getText());
-
+
Message msg2 = sub.receive();
assertNotNull(msg2);
- assertEquals("test2",((TextMessage)msg2).getText());
+ assertEquals("test2",((TextMessage)msg2).getText());
}
-
+
public void testDurableSubscriber() throws Exception
{
- Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
Properties props = new Properties();
@@ -916,19 +916,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr);
props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr);
String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
- props.setProperty("destination.address5", addrStr);
-
- Context ctx = new InitialContext(props);
+ props.setProperty("destination.address5", addrStr);
+
+ Context ctx = new InitialContext(props);
for (int i=1; i < 4; i++)
{
Topic topic = (Topic) ctx.lookup("address"+i);
createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test");
}
-
+
Topic topic = ssn.createTopic("ADDR:news.us");
createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us");
-
+
Topic namedQueue = (Topic) ctx.lookup("address5");
try
{
@@ -1001,10 +1001,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception
- {
+ {
MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr));
-
+
Message m = ssn.createTextMessage(destName);
prod.send(m);
Message msg = cons.receive(1000);
@@ -1012,12 +1012,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertEquals(destName,((TextMessage)msg).getText());
ssn.unsubscribe(destName);
}
-
+
public void testDeleteOptions() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons;
-
+
// default (create never, assert never) -------------------
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
@@ -1031,11 +1031,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
fail("Exception should not be thrown. Exception thrown is : " + e);
}
-
+
assertFalse("Queue not deleted as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, false));
-
-
+
+
String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
dest = new AMQAnyDestination(addr2);
try
@@ -1047,11 +1047,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
fail("Exception should not be thrown. Exception thrown is : " + e);
}
-
+
assertFalse("Queue not deleted as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, false));
-
+
String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
dest = new AMQAnyDestination(addr3);
try
@@ -1064,43 +1064,43 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
fail("Exception should not be thrown. Exception thrown is : " + e);
}
-
+
assertFalse("Queue not deleted as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, false));
}
-
+
/**
* Test Goals : 1. Test if the client sets the correct accept mode for unreliable
* and at-least-once.
* 2. Test default reliability modes for Queues and Topics.
* 3. Test if an exception is thrown if exactly-once is used.
* 4. Test if an exception is thrown if at-least-once is used with topics.
- *
+ *
* Test Strategy: For goal #1 & #2
* For unreliable and at-least-once the test tries to receives messages
* in client_ack mode but does not ack the messages.
* It will then close the session, recreate a new session
* and will then try to verify the queue depth.
* For unreliable the messages should have been taken off the queue.
- * For at-least-once the messages should be put back onto the queue.
- *
+ * For at-least-once the messages should be put back onto the queue.
+ *
*/
-
+
public void testReliabilityOptions() throws Exception
{
String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}";
acceptModeTest(addr1,0);
-
+
String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}";
acceptModeTest(addr2,2);
-
+
// Default accept-mode for topics
- acceptModeTest("ADDR:amq.topic/test",0);
-
+ acceptModeTest("ADDR:amq.topic/test",0);
+
// Default accept-mode for queues
acceptModeTest("ADDR:testQueue1;{create: always}",2);
-
- String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
+
+ String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
try
{
AMQAnyDestination dest = new AMQAnyDestination(addr3);
@@ -1111,83 +1111,83 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
}
}
-
+
private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
{
Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons;
MessageProducer prod;
-
+
AMQDestination dest = new AMQAnyDestination(address);
cons = ssn.createConsumer(dest);
prod = ssn.createProducer(dest);
-
+
for (int i=0; i < expectedQueueDepth; i++)
{
prod.send(ssn.createTextMessage("Msg" + i));
}
-
+
for (int i=0; i < expectedQueueDepth; i++)
{
Message msg = cons.receive(1000);
assertNotNull(msg);
assertEquals("Msg" + i,((TextMessage)msg).getText());
}
-
+
ssn.close();
ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
- assertEquals(expectedQueueDepth,queueDepth);
+ long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
+ assertEquals(expectedQueueDepth,queueDepth);
cons.close();
- prod.close();
+ prod.close();
}
-
+
public void testDestinationOnSend() throws Exception
{
Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test"));
MessageProducer prod = ssn.createProducer(null);
-
+
Topic queue = ssn.createTopic("ADDR:amq.topic/test");
prod.send(queue,ssn.createTextMessage("A"));
-
+
Message msg = cons.receive(1000);
assertNotNull(msg);
assertEquals("A",((TextMessage)msg).getText());
prod.close();
cons.close();
}
-
+
public void testReplyToWithNamelessExchange() throws Exception
{
System.setProperty("qpid.declare_exchanges","false");
replyToTest("ADDR:my-queue;{create: always}");
System.setProperty("qpid.declare_exchanges","true");
}
-
+
public void testReplyToWithCustomExchange() throws Exception
{
replyToTest("ADDR:hello;{create:always,node:{type:topic}}");
}
-
+
private void replyToTest(String replyTo) throws Exception
{
- Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination replyToDest = AMQDestination.createDestination(replyTo);
MessageConsumer replyToCons = session.createConsumer(replyToDest);
-
+
Destination dest = session.createQueue("ADDR:amq.direct/test");
-
+
MessageConsumer cons = session.createConsumer(dest);
MessageProducer prod = session.createProducer(dest);
Message m = session.createTextMessage("test");
m.setJMSReplyTo(replyToDest);
prod.send(m);
-
+
Message msg = cons.receive();
MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
prodR.send(session.createTextMessage("x"));
-
+
Message m1 = replyToCons.receive();
assertNotNull("The reply to consumer should have received the messsage",m1);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index 626592dc10..5dcf678510 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -207,21 +207,21 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener
}
assertTrue("No exception thrown!", caught);
caught = false;
-
+
}
-
+
public void testRuntimeSelectorError() throws JMSException
{
Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1");
MessageProducer producer = session.createProducer(_destination);
Message sentMsg = session.createTextMessage();
-
+
sentMsg.setIntProperty("testproperty", 1); // 1 % 5
producer.send(sentMsg);
Message recvd = consumer.receive(RECIEVE_TIMEOUT);
assertNotNull(recvd);
-
+
sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense
producer.send(sentMsg);
try
@@ -231,47 +231,47 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener
}
catch (Exception e)
{
-
+
}
assertFalse("Connection should not be closed", _connection.isClosed());
}
-
+
public void testSelectorWithJMSMessageID() throws Exception
{
Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
-
+
MessageProducer prod = session.createProducer(_destination);
MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL");
-
+
for (int i=0; i<2; i++)
{
Message msg = session.createTextMessage("Msg" + String.valueOf(i));
prod.send(msg);
}
session.commit();
-
+
Message msg1 = consumer.receive(1000);
Message msg2 = consumer.receive(1000);
-
+
Assert.assertNotNull("Msg1 should not be null", msg1);
Assert.assertNotNull("Msg2 should not be null", msg2);
-
+
session.commit();
-
+
prod.setDisableMessageID(true);
-
- for (int i=0; i<2; i++)
+
+ for (int i=2; i<4; i++)
{
Message msg = session.createTextMessage("Msg" + String.valueOf(i));
prod.send(msg);
}
-
+
session.commit();
- Message msg3 = consumer.receive(1000);
+ Message msg3 = consumer.receive(1000);
Assert.assertNull("Msg3 should be null", msg3);
session.commit();
consumer = session.createConsumer(_destination,"JMSMessageID IS NULL");
-
+
Message msg4 = consumer.receive(1000);
Message msg5 = consumer.receive(1000);
session.commit();
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index e861b4f4ee..f8ab593c88 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -56,19 +56,19 @@ public class StreamMessageTest extends QpidBrokerTestCase
public void testStreamMessageEOF() throws Exception
{
- Connection con = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AMQHeadersExchange queue =
new AMQHeadersExchange(new AMQBindingURL(
ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME
+ "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
+
FieldTable ft = new FieldTable();
ft.setString("x-match", "any");
ft.setString("F1000", "1");
- MessageConsumer consumer =
- consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft);
-
+ consumerSession.declareAndBind(queue, ft);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
// force synch to ensure the consumer has resulted in a bound queue
// ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
// This is the default now
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 5dae98fe21..6bf20d7708 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.test.unit.topic;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -37,6 +40,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import org.apache.qpid.url.URLSyntaxException;
/** @author Apache Software Foundation */
@@ -225,6 +229,44 @@ public class TopicSessionTest extends QpidBrokerTestCase
AMQTopic topic = new AMQTopic(con, "testNoLocal");
+ noLocalTest(con, topic);
+
+
+ con.close();
+ }
+
+
+ public void testNoLocalDirectExchange() throws Exception
+ {
+
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+
+ AMQTopic topic = new AMQTopic("direct://amq.direct/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'");
+
+ noLocalTest(con, topic);
+
+
+ con.close();
+ }
+
+
+
+ public void testNoLocalFanoutExchange() throws Exception
+ {
+
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+
+ AMQTopic topic = new AMQTopic("fanout://amq.fanout/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'");
+
+ noLocalTest(con, topic);
+
+ con.close();
+ }
+
+
+ private void noLocalTest(AMQConnection con, AMQTopic topic)
+ throws JMSException, URLSyntaxException, AMQException, NamingException
+ {
TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber noLocal = session1.createSubscriber(topic, "", true);
@@ -304,9 +346,6 @@ public class TopicSessionTest extends QpidBrokerTestCase
//test nolocal subscriber does message
m = (TextMessage) noLocal.receive(1000);
assertNotNull(m);
-
-
- con.close();
con2.close();
}