diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/messaging')
3 files changed, 194 insertions, 184 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 318fe32d36..72fc74e19c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -20,21 +20,20 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Link.Subscription; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.configuration.Accessor; import org.apache.qpid.configuration.Accessor.MapAccessor; import org.apache.qpid.messaging.Address; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - /** * Utility class for extracting information from the address class */ @@ -68,58 +67,56 @@ public class AddressHelper public static final String ARGUMENTS = "arguments"; public static final String RELIABILITY = "reliability"; - private Address address; - private Accessor addressProps; - private Accessor nodeProps; - private Accessor linkProps; + private Address _address; + private Accessor _addressPropAccess; + private Accessor _nodePropAccess; + private Accessor _linkPropAccess; + private Map _addressPropMap; + private Map _nodePropMap; + private Map _linkPropMap; public AddressHelper(Address address) { - this.address = address; - addressProps = new MapAccessor(address.getOptions()); - Map node_props = address.getOptions() == null + this._address = address; + this._addressPropMap = address.getOptions(); + this._addressPropAccess = new MapAccessor(_addressPropMap); + this._nodePropMap = address.getOptions() == null || address.getOptions().get(NODE) == null ? null : (Map) address.getOptions().get(NODE); - if (node_props != null) + if (_nodePropMap != null) { - nodeProps = new MapAccessor(node_props); + _nodePropAccess = new MapAccessor(_nodePropMap); } - Map link_props = address.getOptions() == null + this._linkPropMap = address.getOptions() == null || address.getOptions().get(LINK) == null ? null : (Map) address.getOptions().get(LINK); - if (link_props != null) + if (_linkPropMap != null) { - linkProps = new MapAccessor(link_props); + _linkPropAccess = new MapAccessor(_linkPropMap); } } public String getCreate() { - return addressProps.getString(CREATE); + return _addressPropAccess.getString(CREATE); } public String getAssert() { - return addressProps.getString(ASSERT); + return _addressPropAccess.getString(ASSERT); } public String getDelete() { - return addressProps.getString(DELETE); - } - - public boolean isNoLocal() - { - Boolean b = nodeProps.getBoolean(NO_LOCAL); - return b == null ? false : b; + return _addressPropAccess.getString(DELETE); } public boolean isBrowseOnly() { - String mode = addressProps.getString(MODE); + String mode = _addressPropAccess.getString(MODE); return mode != null && mode.equals(BROWSE) ? true : false; } @@ -127,7 +124,7 @@ public class AddressHelper public List<Binding> getBindings(Map props) { List<Binding> bindings = new ArrayList<Binding>(); - List<Map> bindingList = (List<Map>) props.get(X_BINDINGS); + List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS); if (bindingList != null) { for (Map bindingMap : bindingList) @@ -157,117 +154,70 @@ public class AddressHelper } } - public int getTargetNodeType() throws Exception + public int getNodeType() throws Exception { - if (nodeProps == null || nodeProps.getString(TYPE) == null) + if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null) { // need to query and figure out return AMQDestination.UNKNOWN_TYPE; - } else if (nodeProps.getString(TYPE).equals("queue")) + } + else if (_nodePropAccess.getString(TYPE).equals("queue")) { return AMQDestination.QUEUE_TYPE; - } else if (nodeProps.getString(TYPE).equals("topic")) + } + else if (_nodePropAccess.getString(TYPE).equals("topic")) { return AMQDestination.TOPIC_TYPE; - } else + } + else { throw new Exception("unkown exchange type"); } } - public Node getTargetNode(int addressType) + public Node getNode() { - // target node here is the default exchange - if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) - { - return new ExchangeNode(); - } else if (addressType == AMQDestination.TOPIC_TYPE) - { - Map node = (Map) address.getOptions().get(NODE); - return createExchangeNode(node); - } else + Node node = new Node(_address.getName()); + if (_nodePropAccess != null) { - // don't know yet - return null; - } - } - - private Node createExchangeNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - ExchangeNode node = new ExchangeNode(); - node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap - .getString(TYPE)); - fillInCommonNodeArgs(node, parent, argsMap); - return node; - } + Map xDeclareMap = getDeclareArgs(_nodePropMap); + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); - private Node createQueueNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - QueueNode node = new QueueNode(); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false - : argsMap.getBoolean(EXCLUSIVE)); - fillInCommonNodeArgs(node, parent, argsMap); - - return node; - } - - private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap) - { - node.setDurable(getDurability(parent)); - node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false - : argsMap.getBoolean(AUTO_DELETE)); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setBindings(getBindings(parent)); - if (getDeclareArgs(parent).containsKey(ARGUMENTS)) - { - node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS)); + node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false)); + node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false)); + node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false)); + node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + if (xDeclareMapAccessor.getString(TYPE) != null) + { + node.setExchangeType(xDeclareMapAccessor.getString(TYPE)); + } + node.setBindings(getBindings(_nodePropMap)); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } - } - - private boolean getDurability(Map map) - { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); + return node; } - /** - * if the type == queue x-declare args from the node props is used. if the - * type == exchange x-declare args from the link props is used else just - * create a default temp queue. - */ - public Node getSourceNode(int addressType) + // This should really be in the Accessor interface + private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue) { - if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) - { - return createQueueNode((Map) address.getOptions().get(NODE)); - } - if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) - { - return createQueueNode((Map) address.getOptions().get(LINK)); - } else - { - // need to query the info - return new QueueNode(); - } + Boolean result = access.getBoolean(propName); + return (result == null) ? defaultValue : result.booleanValue(); } public Link getLink() throws Exception { Link link = new Link(); link.setSubscription(new Subscription()); - if (linkProps != null) + link.setSubscriptionQueue(new SubscriptionQueue()); + if (_linkPropAccess != null) { - link.setDurable(linkProps.getBoolean(DURABLE) == null ? false - : linkProps.getBoolean(DURABLE)); - link.setName(linkProps.getString(NAME)); + link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false)); + link.setName(_linkPropAccess.getString(NAME)); - String reliability = linkProps.getString(RELIABILITY); + String reliability = _linkPropAccess.getString(RELIABILITY); if ( reliability != null) { if (reliability.equalsIgnoreCase("unreliable")) @@ -283,13 +233,12 @@ public class AddressHelper throw new Exception("The reliability mode '" + reliability + "' is not yet supported"); } - } - if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) address.getOptions().get(LINK)) + (Map) ((Map) _address.getOptions().get(LINK)) .get(CAPACITY)); link .setConsumerCapacity(capacityProps @@ -302,17 +251,19 @@ public class AddressHelper } else { - int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps + int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess .getInt(CAPACITY); link.setConsumerCapacity(cap); link.setProducerCapacity(cap); } - link.setFilter(linkProps.getString(FILTER)); + link.setFilter(_linkPropAccess.getString(FILTER)); // so far filter type not used - if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + Map linkMap = (Map) _address.getOptions().get(LINK); + + if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE)) { - Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); if (x_subscribe.containsKey(ARGUMENTS)) { @@ -324,6 +275,18 @@ public class AddressHelper link.getSubscription().setExclusive(exclusive); } + + link.setBindings(getBindings(linkMap)); + Map xDeclareMap = getDeclareArgs(linkMap); + SubscriptionQueue queue = link.getSubscriptionQueue(); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true)); + queue.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true)); + queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } return link; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 41f6725c8f..40a84ebd02 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpid.client.AMQDestination.Binding; + public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } @@ -36,10 +41,11 @@ public class Link private boolean _isDurable; private int _consumerCapacity = 0; private int _producerCapacity = 0; - private Node node; private Subscription subscription; private Reliability reliability = Reliability.AT_LEAST_ONCE; - + private List<Binding> _bindings = new ArrayList<Binding>(); + private SubscriptionQueue _subscriptionQueue; + public Reliability getReliability() { return reliability; @@ -50,21 +56,11 @@ public class Link this.reliability = reliability; } - public Node getNode() - { - return node; - } - - public void setNode(Node node) - { - this.node = node; - } - public boolean isDurable() { return _isDurable; } - + public void setDurable(boolean durable) { _isDurable = durable; @@ -139,6 +135,74 @@ public class Link { this.subscription = subscription; } + + public List<Binding> getBindings() + { + return _bindings; + } + + public void setBindings(List<Binding> bindings) + { + _bindings = bindings; + } + + public SubscriptionQueue getSubscriptionQueue() + { + return _subscriptionQueue; + } + + public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue) + { + this._subscriptionQueue = subscriptionQueue; + } + + public static class SubscriptionQueue + { + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); + private boolean _isAutoDelete = true; + private boolean _isExclusive = true; + private String _alternateExchange; + + public Map<String,Object> getDeclareArgs() + { + return _declareArgs; + } + + public void setDeclareArgs(Map<String,Object> options) + { + _declareArgs = options; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + } public static class Subscription { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index 0da0327885..005f98f344 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestination.Binding; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -public abstract class Node +public class Node { private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private String _name; private boolean _isDurable; private boolean _isAutoDelete; + private boolean _isExclusive; private String _alternateExchange; + private String _exchangeType = "topic"; // used when node is an exchange instead of a queue. private List<Binding> _bindings = new ArrayList<Binding>(); - private Map<String,Object> _declareArgs = Collections.emptyMap(); + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); - protected Node(int nodeType) + protected Node(String name) + { + _name = name; + } + + public String getName() + { + return _name; + } + + public void setNodeType(int nodeType) { _nodeType = nodeType; } @@ -58,6 +72,16 @@ public abstract class Node _isDurable = durable; } + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + public boolean isAutoDelete() { return _isAutoDelete; @@ -100,56 +124,15 @@ public abstract class Node public void setDeclareArgs(Map<String,Object> options) { _declareArgs = options; - } - - public static class QueueNode extends Node - { - private boolean _isExclusive; - private QpidQueueOptions _queueOptions = new QpidQueueOptions(); - - public QueueNode() - { - super(AMQDestination.QUEUE_TYPE); - } - - public boolean isExclusive() - { - return _isExclusive; - } - - public void setExclusive(boolean exclusive) - { - _isExclusive = exclusive; - } } - - public static class ExchangeNode extends Node - { - private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); - private String _exchangeType; - - public ExchangeNode() - { - super(AMQDestination.TOPIC_TYPE); - } - - public String getExchangeType() - { - return _exchangeType; - } - - public void setExchangeType(String exchangeType) - { - _exchangeType = exchangeType; - } - + + public void setExchangeType(String type) + { + _exchangeType = type; } - - public static class UnknownNodeType extends Node + + public String getExchangeType() { - public UnknownNodeType() - { - super(AMQDestination.UNKNOWN_TYPE); - } + return _exchangeType; } } |