summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/messaging')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java203
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java90
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java85
3 files changed, 194 insertions, 184 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 318fe32d36..72fc74e19c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -20,21 +20,20 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Link.Subscription;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.configuration.Accessor;
import org.apache.qpid.configuration.Accessor.MapAccessor;
import org.apache.qpid.messaging.Address;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
/**
* Utility class for extracting information from the address class
*/
@@ -68,58 +67,56 @@ public class AddressHelper
public static final String ARGUMENTS = "arguments";
public static final String RELIABILITY = "reliability";
- private Address address;
- private Accessor addressProps;
- private Accessor nodeProps;
- private Accessor linkProps;
+ private Address _address;
+ private Accessor _addressPropAccess;
+ private Accessor _nodePropAccess;
+ private Accessor _linkPropAccess;
+ private Map _addressPropMap;
+ private Map _nodePropMap;
+ private Map _linkPropMap;
public AddressHelper(Address address)
{
- this.address = address;
- addressProps = new MapAccessor(address.getOptions());
- Map node_props = address.getOptions() == null
+ this._address = address;
+ this._addressPropMap = address.getOptions();
+ this._addressPropAccess = new MapAccessor(_addressPropMap);
+ this._nodePropMap = address.getOptions() == null
|| address.getOptions().get(NODE) == null ? null
: (Map) address.getOptions().get(NODE);
- if (node_props != null)
+ if (_nodePropMap != null)
{
- nodeProps = new MapAccessor(node_props);
+ _nodePropAccess = new MapAccessor(_nodePropMap);
}
- Map link_props = address.getOptions() == null
+ this._linkPropMap = address.getOptions() == null
|| address.getOptions().get(LINK) == null ? null
: (Map) address.getOptions().get(LINK);
- if (link_props != null)
+ if (_linkPropMap != null)
{
- linkProps = new MapAccessor(link_props);
+ _linkPropAccess = new MapAccessor(_linkPropMap);
}
}
public String getCreate()
{
- return addressProps.getString(CREATE);
+ return _addressPropAccess.getString(CREATE);
}
public String getAssert()
{
- return addressProps.getString(ASSERT);
+ return _addressPropAccess.getString(ASSERT);
}
public String getDelete()
{
- return addressProps.getString(DELETE);
- }
-
- public boolean isNoLocal()
- {
- Boolean b = nodeProps.getBoolean(NO_LOCAL);
- return b == null ? false : b;
+ return _addressPropAccess.getString(DELETE);
}
public boolean isBrowseOnly()
{
- String mode = addressProps.getString(MODE);
+ String mode = _addressPropAccess.getString(MODE);
return mode != null && mode.equals(BROWSE) ? true : false;
}
@@ -127,7 +124,7 @@ public class AddressHelper
public List<Binding> getBindings(Map props)
{
List<Binding> bindings = new ArrayList<Binding>();
- List<Map> bindingList = (List<Map>) props.get(X_BINDINGS);
+ List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS);
if (bindingList != null)
{
for (Map bindingMap : bindingList)
@@ -157,117 +154,70 @@ public class AddressHelper
}
}
- public int getTargetNodeType() throws Exception
+ public int getNodeType() throws Exception
{
- if (nodeProps == null || nodeProps.getString(TYPE) == null)
+ if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
{
// need to query and figure out
return AMQDestination.UNKNOWN_TYPE;
- } else if (nodeProps.getString(TYPE).equals("queue"))
+ }
+ else if (_nodePropAccess.getString(TYPE).equals("queue"))
{
return AMQDestination.QUEUE_TYPE;
- } else if (nodeProps.getString(TYPE).equals("topic"))
+ }
+ else if (_nodePropAccess.getString(TYPE).equals("topic"))
{
return AMQDestination.TOPIC_TYPE;
- } else
+ }
+ else
{
throw new Exception("unkown exchange type");
}
}
- public Node getTargetNode(int addressType)
+ public Node getNode()
{
- // target node here is the default exchange
- if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
- {
- return new ExchangeNode();
- } else if (addressType == AMQDestination.TOPIC_TYPE)
- {
- Map node = (Map) address.getOptions().get(NODE);
- return createExchangeNode(node);
- } else
+ Node node = new Node(_address.getName());
+ if (_nodePropAccess != null)
{
- // don't know yet
- return null;
- }
- }
-
- private Node createExchangeNode(Map parent)
- {
- Map declareArgs = getDeclareArgs(parent);
- MapAccessor argsMap = new MapAccessor(declareArgs);
- ExchangeNode node = new ExchangeNode();
- node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap
- .getString(TYPE));
- fillInCommonNodeArgs(node, parent, argsMap);
- return node;
- }
+ Map xDeclareMap = getDeclareArgs(_nodePropMap);
+ MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
- private Node createQueueNode(Map parent)
- {
- Map declareArgs = getDeclareArgs(parent);
- MapAccessor argsMap = new MapAccessor(declareArgs);
- QueueNode node = new QueueNode();
- node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
- node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false
- : argsMap.getBoolean(EXCLUSIVE));
- fillInCommonNodeArgs(node, parent, argsMap);
-
- return node;
- }
-
- private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap)
- {
- node.setDurable(getDurability(parent));
- node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false
- : argsMap.getBoolean(AUTO_DELETE));
- node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
- node.setBindings(getBindings(parent));
- if (getDeclareArgs(parent).containsKey(ARGUMENTS))
- {
- node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS));
+ node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false));
+ node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false));
+ node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false));
+ node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+ if (xDeclareMapAccessor.getString(TYPE) != null)
+ {
+ node.setExchangeType(xDeclareMapAccessor.getString(TYPE));
+ }
+ node.setBindings(getBindings(_nodePropMap));
+ if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+ {
+ node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+ }
}
- }
-
- private boolean getDurability(Map map)
- {
- Accessor access = new MapAccessor(map);
- Boolean result = access.getBoolean(DURABLE);
- return (result == null) ? false : result.booleanValue();
+ return node;
}
- /**
- * if the type == queue x-declare args from the node props is used. if the
- * type == exchange x-declare args from the link props is used else just
- * create a default temp queue.
- */
- public Node getSourceNode(int addressType)
+ // This should really be in the Accessor interface
+ private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue)
{
- if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
- {
- return createQueueNode((Map) address.getOptions().get(NODE));
- }
- if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
- {
- return createQueueNode((Map) address.getOptions().get(LINK));
- } else
- {
- // need to query the info
- return new QueueNode();
- }
+ Boolean result = access.getBoolean(propName);
+ return (result == null) ? defaultValue : result.booleanValue();
}
public Link getLink() throws Exception
{
Link link = new Link();
link.setSubscription(new Subscription());
- if (linkProps != null)
+ link.setSubscriptionQueue(new SubscriptionQueue());
+ if (_linkPropAccess != null)
{
- link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
- : linkProps.getBoolean(DURABLE));
- link.setName(linkProps.getString(NAME));
+ link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false));
+ link.setName(_linkPropAccess.getString(NAME));
- String reliability = linkProps.getString(RELIABILITY);
+ String reliability = _linkPropAccess.getString(RELIABILITY);
if ( reliability != null)
{
if (reliability.equalsIgnoreCase("unreliable"))
@@ -283,13 +233,12 @@ public class AddressHelper
throw new Exception("The reliability mode '" +
reliability + "' is not yet supported");
}
-
}
- if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+ if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
MapAccessor capacityProps = new MapAccessor(
- (Map) ((Map) address.getOptions().get(LINK))
+ (Map) ((Map) _address.getOptions().get(LINK))
.get(CAPACITY));
link
.setConsumerCapacity(capacityProps
@@ -302,17 +251,19 @@ public class AddressHelper
}
else
{
- int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
+ int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
.getInt(CAPACITY);
link.setConsumerCapacity(cap);
link.setProducerCapacity(cap);
}
- link.setFilter(linkProps.getString(FILTER));
+ link.setFilter(_linkPropAccess.getString(FILTER));
// so far filter type not used
- if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ Map linkMap = (Map) _address.getOptions().get(LINK);
+
+ if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
{
- Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+ Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
if (x_subscribe.containsKey(ARGUMENTS))
{
@@ -324,6 +275,18 @@ public class AddressHelper
link.getSubscription().setExclusive(exclusive);
}
+
+ link.setBindings(getBindings(linkMap));
+ Map xDeclareMap = getDeclareArgs(linkMap);
+ SubscriptionQueue queue = link.getSubscriptionQueue();
+ if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+ {
+ MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
+ queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true));
+ queue.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true));
+ queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+ queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+ }
}
return link;
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 41f6725c8f..40a84ebd02 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpid.client.AMQDestination.Binding;
+
public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
@@ -36,10 +41,11 @@ public class Link
private boolean _isDurable;
private int _consumerCapacity = 0;
private int _producerCapacity = 0;
- private Node node;
private Subscription subscription;
private Reliability reliability = Reliability.AT_LEAST_ONCE;
-
+ private List<Binding> _bindings = new ArrayList<Binding>();
+ private SubscriptionQueue _subscriptionQueue;
+
public Reliability getReliability()
{
return reliability;
@@ -50,21 +56,11 @@ public class Link
this.reliability = reliability;
}
- public Node getNode()
- {
- return node;
- }
-
- public void setNode(Node node)
- {
- this.node = node;
- }
-
public boolean isDurable()
{
return _isDurable;
}
-
+
public void setDurable(boolean durable)
{
_isDurable = durable;
@@ -139,6 +135,74 @@ public class Link
{
this.subscription = subscription;
}
+
+ public List<Binding> getBindings()
+ {
+ return _bindings;
+ }
+
+ public void setBindings(List<Binding> bindings)
+ {
+ _bindings = bindings;
+ }
+
+ public SubscriptionQueue getSubscriptionQueue()
+ {
+ return _subscriptionQueue;
+ }
+
+ public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue)
+ {
+ this._subscriptionQueue = subscriptionQueue;
+ }
+
+ public static class SubscriptionQueue
+ {
+ private Map<String,Object> _declareArgs = new HashMap<String,Object>();
+ private boolean _isAutoDelete = true;
+ private boolean _isExclusive = true;
+ private String _alternateExchange;
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _declareArgs;
+ }
+
+ public void setDeclareArgs(Map<String,Object> options)
+ {
+ _declareArgs = options;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public void setAutoDelete(boolean autoDelete)
+ {
+ _isAutoDelete = autoDelete;
+ }
+
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
+ public String getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String altExchange)
+ {
+ _alternateExchange = altExchange;
+ }
+ }
public static class Subscription
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
index 0da0327885..005f98f344 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
@@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestination.Binding;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public abstract class Node
+public class Node
{
private int _nodeType = AMQDestination.UNKNOWN_TYPE;
+ private String _name;
private boolean _isDurable;
private boolean _isAutoDelete;
+ private boolean _isExclusive;
private String _alternateExchange;
+ private String _exchangeType = "topic"; // used when node is an exchange instead of a queue.
private List<Binding> _bindings = new ArrayList<Binding>();
- private Map<String,Object> _declareArgs = Collections.emptyMap();
+ private Map<String,Object> _declareArgs = new HashMap<String,Object>();
- protected Node(int nodeType)
+ protected Node(String name)
+ {
+ _name = name;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public void setNodeType(int nodeType)
{
_nodeType = nodeType;
}
@@ -58,6 +72,16 @@ public abstract class Node
_isDurable = durable;
}
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
public boolean isAutoDelete()
{
return _isAutoDelete;
@@ -100,56 +124,15 @@ public abstract class Node
public void setDeclareArgs(Map<String,Object> options)
{
_declareArgs = options;
- }
-
- public static class QueueNode extends Node
- {
- private boolean _isExclusive;
- private QpidQueueOptions _queueOptions = new QpidQueueOptions();
-
- public QueueNode()
- {
- super(AMQDestination.QUEUE_TYPE);
- }
-
- public boolean isExclusive()
- {
- return _isExclusive;
- }
-
- public void setExclusive(boolean exclusive)
- {
- _isExclusive = exclusive;
- }
}
-
- public static class ExchangeNode extends Node
- {
- private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
- private String _exchangeType;
-
- public ExchangeNode()
- {
- super(AMQDestination.TOPIC_TYPE);
- }
-
- public String getExchangeType()
- {
- return _exchangeType;
- }
-
- public void setExchangeType(String exchangeType)
- {
- _exchangeType = exchangeType;
- }
-
+
+ public void setExchangeType(String type)
+ {
+ _exchangeType = type;
}
-
- public static class UnknownNodeType extends Node
+
+ public String getExchangeType()
{
- public UnknownNodeType()
- {
- super(AMQDestination.UNKNOWN_TYPE);
- }
+ return _exchangeType;
}
}