summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-22 20:32:43 +0000
committerRobert Greig <rgreig@apache.org>2006-12-22 20:32:43 +0000
commit56e155dc210a1e46796b18f179a00a56ae52a0e7 (patch)
tree9add891a6b632f7d1eeefeb2019079efce7739df /java/broker/src
parenta71fc76d0f605d1faa76f8aec7b4498c168aec46 (diff)
downloadqpid-python-56e155dc210a1e46796b18f179a00a56ae52a0e7.tar.gz
QPID-229 : Patch supplied by Rob Godfrey - Change implementation of FieldTable
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java143
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java42
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java7
3 files changed, 136 insertions, 56 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index b058211288..1c63a5571e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -21,12 +21,11 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQTypedValue;
-import java.util.Collections;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
/**
* Defines binding and matching based on a set of headers.
@@ -35,11 +34,53 @@ class HeadersBinding
{
private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
- private final Map _mappings = new HashMap();
- private final Set<Object> required = new HashSet<Object>();
- private final Set<Map.Entry> matches = new HashSet<Map.Entry>();
+ private final FieldTable _mappings = new FieldTable();
+ private final Set<String> required = new HashSet<String>();
+ private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
+ private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor
+ {
+ private Boolean _result = Boolean.FALSE;
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName)))
+ {
+ _result = Boolean.TRUE;
+ return false;
+ }
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return _result;
+ }
+ }
+
+ private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor
+ {
+ Boolean _result = Boolean.FALSE;
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ if(required.contains(propertyName))
+ {
+ _result = Boolean.TRUE;
+ return false;
+ }
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return _result;
+ }
+ }
+
+
+
/**
* Creates a binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
@@ -47,33 +88,50 @@ class HeadersBinding
* define a required match of value.
* @param mappings the defined mappings this binding should use
*/
- HeadersBinding(Map mappings)
+
+ HeadersBinding(FieldTable mappings)
{
- //noinspection unchecked
- this(mappings == null ? new HashSet<Map.Entry>() : mappings.entrySet());
- _mappings.putAll(mappings);
+ Enumeration propertyNames = mappings.getPropertyNames();
+ while(propertyNames.hasMoreElements())
+ {
+ String propName = (String) propertyNames.nextElement();
+ _mappings.put(propName, mappings.getObject(propName));
+ }
+ initMappings();
}
- private HeadersBinding(Set<Map.Entry> entries)
+ private void initMappings()
{
- for (Map.Entry e : entries)
+
+ _mappings.processOverElements(new FieldTable.FieldTableElementProcessor()
{
- if (isSpecial(e.getKey()))
- {
- processSpecial((String) e.getKey(), e.getValue());
- }
- else if (e.getValue() == null || e.getValue().equals(""))
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
{
- required.add(e.getKey());
+ if (isSpecial(propertyName))
+ {
+ processSpecial(propertyName, value.getValue());
+ }
+ else if (value.getValue() == null || value.getValue().equals(""))
+ {
+ required.add(propertyName);
+ }
+ else
+ {
+ matches.put(propertyName,value.getValue());
+ }
+
+ return true;
}
- else
+
+ public Object getResult()
{
- matches.add(e);
+ return null;
}
- }
+ });
}
- protected Map getMappings()
+ protected FieldTable getMappings()
{
return _mappings;
}
@@ -84,7 +142,7 @@ class HeadersBinding
* @return true if the headers define any required keys and match any required
* values
*/
- public boolean matches(Map headers)
+ public boolean matches(FieldTable headers)
{
if(headers == null)
{
@@ -96,18 +154,37 @@ class HeadersBinding
}
}
- private boolean and(Map headers)
+ private boolean and(FieldTable headers)
{
- //need to match all the defined mapping rules:
- return headers.keySet().containsAll(required)
- && headers.entrySet().containsAll(matches);
+ if(headers.keys().containsAll(required))
+ {
+ for(Map.Entry<String, Object> e : matches.entrySet())
+ {
+ if(!e.getValue().equals(headers.getObject(e.getKey())))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- private boolean or(Map headers)
+
+ private boolean or(final FieldTable headers)
{
- //only need to match one mapping rule:
- return !Collections.disjoint(headers.keySet(), required)
- || !Collections.disjoint(headers.entrySet(), matches);
+ if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor()))
+ {
+ return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor()))
+ || (required.isEmpty() && matches.isEmpty());
+ }
+ else
+ {
+ return true;
+ }
}
private void processSpecial(String key, Object value)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 8c4df68dea..229502d2a6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -22,10 +22,7 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -34,10 +31,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import javax.management.JMException;
import javax.management.openmbean.*;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -119,16 +113,24 @@ public class HeadersExchange extends AbstractExchange
String queueName = registration.queue.getName();
HeadersBinding headers = registration.binding;
- Map<Object, Object> headerMappings = headers.getMappings();
- List<String> mappingList = new ArrayList<String>();
+ FieldTable headerMappings = headers.getMappings();
+ final List<String> mappingList = new ArrayList<String>();
- for (Map.Entry<Object, Object> en : headerMappings.entrySet())
+ headerMappings.processOverElements(new FieldTable.FieldTableElementProcessor()
{
- String key = en.getKey().toString();
- String value = en.getValue().toString();
- mappingList.add(key + "=" + value);
- }
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ mappingList.add(propertyName + "=" + value.getValue());
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return mappingList;
+ }
+ });
+
Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
@@ -155,7 +157,7 @@ public class HeadersExchange extends AbstractExchange
}
String[] bindings = binding.split(",");
- FieldTable fieldTable = FieldTableFactory.newFieldTable();
+ FieldTable bindingMap = new FieldTable();
for (int i = 0; i < bindings.length; i++)
{
String[] keyAndValue = bindings[i].split("=");
@@ -163,10 +165,10 @@ public class HeadersExchange extends AbstractExchange
{
throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
}
- fieldTable.put(keyAndValue[0], keyAndValue[1]);
+ bindingMap.setString(keyAndValue[0], keyAndValue[1]);
}
- _bindings.add(new Registration(new HeadersBinding(fieldTable), queue));
+ _bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
}
} // End of MBean class
@@ -185,7 +187,7 @@ public class HeadersExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- Map headers = getHeaders(payload.getContentHeaderBody());
+ FieldTable headers = getHeaders(payload.getContentHeaderBody());
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
@@ -248,7 +250,7 @@ public class HeadersExchange extends AbstractExchange
return !_bindings.isEmpty();
}
- protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
+ protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
//but these are not yet implemented.
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index f8c15d937a..10aa621f89 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -24,17 +24,18 @@ import java.util.Map;
import java.util.HashMap;
import junit.framework.TestCase;
+import org.apache.qpid.framing.FieldTable;
/**
*/
public class HeadersBindingTest extends TestCase
{
- private Map<String, String> bindHeaders = new HashMap<String, String>();
- private Map<String, String> matchHeaders = new HashMap<String, String>();
+ private FieldTable bindHeaders = new FieldTable();
+ private FieldTable matchHeaders = new FieldTable();
public void testDefault_1()
{
- bindHeaders.put("A", "Value of A");
+ bindHeaders.setString("A", "Value of A");
matchHeaders.put("A", "Value of A");