diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-22 20:32:43 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-22 20:32:43 +0000 |
commit | 56e155dc210a1e46796b18f179a00a56ae52a0e7 (patch) | |
tree | 9add891a6b632f7d1eeefeb2019079efce7739df /java/broker/src | |
parent | a71fc76d0f605d1faa76f8aec7b4498c168aec46 (diff) | |
download | qpid-python-56e155dc210a1e46796b18f179a00a56ae52a0e7.tar.gz |
QPID-229 : Patch supplied by Rob Godfrey - Change implementation of FieldTable
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
3 files changed, 136 insertions, 56 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index b058211288..1c63a5571e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -21,12 +21,11 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQTypedValue; -import java.util.Collections; -import java.util.Map; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; +import java.util.*; /** * Defines binding and matching based on a set of headers. @@ -35,11 +34,53 @@ class HeadersBinding { private static final Logger _logger = Logger.getLogger(HeadersBinding.class); - private final Map _mappings = new HashMap(); - private final Set<Object> required = new HashSet<Object>(); - private final Set<Map.Entry> matches = new HashSet<Map.Entry>(); + private final FieldTable _mappings = new FieldTable(); + private final Set<String> required = new HashSet<String>(); + private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; + private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor + { + private Boolean _result = Boolean.FALSE; + + public boolean processElement(String propertyName, AMQTypedValue value) + { + if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName))) + { + _result = Boolean.TRUE; + return false; + } + return true; + } + + public Object getResult() + { + return _result; + } + } + + private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor + { + Boolean _result = Boolean.FALSE; + + public boolean processElement(String propertyName, AMQTypedValue value) + { + if(required.contains(propertyName)) + { + _result = Boolean.TRUE; + return false; + } + return true; + } + + public Object getResult() + { + return _result; + } + } + + + /** * Creates a binding for a set of mappings. Those mappings whose value is * null or the empty string are assumed only to be required headers, with @@ -47,33 +88,50 @@ class HeadersBinding * define a required match of value. * @param mappings the defined mappings this binding should use */ - HeadersBinding(Map mappings) + + HeadersBinding(FieldTable mappings) { - //noinspection unchecked - this(mappings == null ? new HashSet<Map.Entry>() : mappings.entrySet()); - _mappings.putAll(mappings); + Enumeration propertyNames = mappings.getPropertyNames(); + while(propertyNames.hasMoreElements()) + { + String propName = (String) propertyNames.nextElement(); + _mappings.put(propName, mappings.getObject(propName)); + } + initMappings(); } - private HeadersBinding(Set<Map.Entry> entries) + private void initMappings() { - for (Map.Entry e : entries) + + _mappings.processOverElements(new FieldTable.FieldTableElementProcessor() { - if (isSpecial(e.getKey())) - { - processSpecial((String) e.getKey(), e.getValue()); - } - else if (e.getValue() == null || e.getValue().equals("")) + + public boolean processElement(String propertyName, AMQTypedValue value) { - required.add(e.getKey()); + if (isSpecial(propertyName)) + { + processSpecial(propertyName, value.getValue()); + } + else if (value.getValue() == null || value.getValue().equals("")) + { + required.add(propertyName); + } + else + { + matches.put(propertyName,value.getValue()); + } + + return true; } - else + + public Object getResult() { - matches.add(e); + return null; } - } + }); } - protected Map getMappings() + protected FieldTable getMappings() { return _mappings; } @@ -84,7 +142,7 @@ class HeadersBinding * @return true if the headers define any required keys and match any required * values */ - public boolean matches(Map headers) + public boolean matches(FieldTable headers) { if(headers == null) { @@ -96,18 +154,37 @@ class HeadersBinding } } - private boolean and(Map headers) + private boolean and(FieldTable headers) { - //need to match all the defined mapping rules: - return headers.keySet().containsAll(required) - && headers.entrySet().containsAll(matches); + if(headers.keys().containsAll(required)) + { + for(Map.Entry<String, Object> e : matches.entrySet()) + { + if(!e.getValue().equals(headers.getObject(e.getKey()))) + { + return false; + } + } + return true; + } + else + { + return false; + } } - private boolean or(Map headers) + + private boolean or(final FieldTable headers) { - //only need to match one mapping rule: - return !Collections.disjoint(headers.keySet(), required) - || !Collections.disjoint(headers.entrySet(), matches); + if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor())) + { + return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor())) + || (required.isEmpty() && matches.isEmpty()); + } + else + { + return true; + } } private void processSpecial(String key, Object value) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 8c4df68dea..229502d2a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -22,10 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.*; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; @@ -34,10 +31,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import javax.management.JMException; import javax.management.openmbean.*; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -119,16 +113,24 @@ public class HeadersExchange extends AbstractExchange String queueName = registration.queue.getName(); HeadersBinding headers = registration.binding; - Map<Object, Object> headerMappings = headers.getMappings(); - List<String> mappingList = new ArrayList<String>(); + FieldTable headerMappings = headers.getMappings(); + final List<String> mappingList = new ArrayList<String>(); - for (Map.Entry<Object, Object> en : headerMappings.entrySet()) + headerMappings.processOverElements(new FieldTable.FieldTableElementProcessor() { - String key = en.getKey().toString(); - String value = en.getValue().toString(); - mappingList.add(key + "=" + value); - } + public boolean processElement(String propertyName, AMQTypedValue value) + { + mappingList.add(propertyName + "=" + value.getValue()); + return true; + } + + public Object getResult() + { + return mappingList; + } + }); + Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])}; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); @@ -155,7 +157,7 @@ public class HeadersExchange extends AbstractExchange } String[] bindings = binding.split(","); - FieldTable fieldTable = FieldTableFactory.newFieldTable(); + FieldTable bindingMap = new FieldTable(); for (int i = 0; i < bindings.length; i++) { String[] keyAndValue = bindings[i].split("="); @@ -163,10 +165,10 @@ public class HeadersExchange extends AbstractExchange { throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" "); } - fieldTable.put(keyAndValue[0], keyAndValue[1]); + bindingMap.setString(keyAndValue[0], keyAndValue[1]); } - _bindings.add(new Registration(new HeadersBinding(fieldTable), queue)); + _bindings.add(new Registration(new HeadersBinding(bindingMap), queue)); } } // End of MBean class @@ -185,7 +187,7 @@ public class HeadersExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { - Map headers = getHeaders(payload.getContentHeaderBody()); + FieldTable headers = getHeaders(payload.getContentHeaderBody()); if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); @@ -248,7 +250,7 @@ public class HeadersExchange extends AbstractExchange return !_bindings.isEmpty(); } - protected Map getHeaders(ContentHeaderBody contentHeaderFrame) + protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame) { //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, //but these are not yet implemented. diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index f8c15d937a..10aa621f89 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -24,17 +24,18 @@ import java.util.Map; import java.util.HashMap; import junit.framework.TestCase; +import org.apache.qpid.framing.FieldTable; /** */ public class HeadersBindingTest extends TestCase { - private Map<String, String> bindHeaders = new HashMap<String, String>(); - private Map<String, String> matchHeaders = new HashMap<String, String>(); + private FieldTable bindHeaders = new FieldTable(); + private FieldTable matchHeaders = new FieldTable(); public void testDefault_1() { - bindHeaders.put("A", "Value of A"); + bindHeaders.setString("A", "Value of A"); matchHeaders.put("A", "Value of A"); |