diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address')
5 files changed, 800 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java new file mode 100644 index 0000000000..368ec60525 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -0,0 +1,332 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.messaging.address; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Reliability; +import org.apache.qpid.client.messaging.address.Link.Subscription; +import org.apache.qpid.client.messaging.address.Node.ExchangeNode; +import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; +import org.apache.qpid.configuration.Accessor; +import org.apache.qpid.configuration.Accessor.MapAccessor; +import org.apache.qpid.messaging.Address; + +/** + * Utility class for extracting information from the address class + */ +public class AddressHelper +{ + public static final String NODE = "node"; + public static final String LINK = "link"; + public static final String X_DECLARE = "x-declare"; + public static final String X_BINDINGS = "x-bindings"; + public static final String X_SUBSCRIBE = "x-subscribes"; + public static final String CREATE = "create"; + public static final String ASSERT = "assert"; + public static final String DELETE = "delete"; + public static final String FILTER = "filter"; + public static final String NO_LOCAL = "no-local"; + public static final String DURABLE = "durable"; + public static final String EXCLUSIVE = "exclusive"; + public static final String AUTO_DELETE = "auto-delete"; + public static final String TYPE = "type"; + public static final String ALT_EXCHANGE = "alternate-exchange"; + public static final String BINDINGS = "bindings"; + public static final String BROWSE = "browse"; + public static final String MODE = "mode"; + public static final String CAPACITY = "capacity"; + public static final String CAPACITY_SOURCE = "source"; + public static final String CAPACITY_TARGET = "target"; + public static final String NAME = "name"; + public static final String EXCHANGE = "exchange"; + public static final String QUEUE = "queue"; + public static final String KEY = "key"; + public static final String ARGUMENTS = "arguments"; + public static final String RELIABILITY = "reliability"; + + private Address address; + private Accessor addressProps; + private Accessor nodeProps; + private Accessor linkProps; + + public AddressHelper(Address address) + { + this.address = address; + addressProps = new MapAccessor(address.getOptions()); + Map node_props = address.getOptions() == null + || address.getOptions().get(NODE) == null ? null + : (Map) address.getOptions().get(NODE); + + if (node_props != null) + { + nodeProps = new MapAccessor(node_props); + } + + Map link_props = address.getOptions() == null + || address.getOptions().get(LINK) == null ? null + : (Map) address.getOptions().get(LINK); + + if (link_props != null) + { + linkProps = new MapAccessor(link_props); + } + } + + public String getCreate() + { + return addressProps.getString(CREATE); + } + + public String getAssert() + { + return addressProps.getString(ASSERT); + } + + public String getDelete() + { + return addressProps.getString(DELETE); + } + + public boolean isNoLocal() + { + Boolean b = nodeProps.getBoolean(NO_LOCAL); + return b == null ? false : b; + } + + public boolean isBrowseOnly() + { + String mode = addressProps.getString(MODE); + return mode != null && mode.equals(BROWSE) ? true : false; + } + + @SuppressWarnings("unchecked") + public List<Binding> getBindings(Map props) + { + List<Binding> bindings = new ArrayList<Binding>(); + List<Map> bindingList = (List<Map>) props.get(X_BINDINGS); + if (bindingList != null) + { + for (Map bindingMap : bindingList) + { + Binding binding = new Binding( + (String) bindingMap.get(EXCHANGE), + (String) bindingMap.get(QUEUE), + (String) bindingMap.get(KEY), + bindingMap.get(ARGUMENTS) == null ? Collections.EMPTY_MAP + : (Map<String, Object>) bindingMap + .get(ARGUMENTS)); + bindings.add(binding); + } + } + return bindings; + } + + public Map getDeclareArgs(Map props) + { + if (props != null && props.get(X_DECLARE) != null) + { + return (Map) props.get(X_DECLARE); + + } else + { + return Collections.EMPTY_MAP; + } + } + + public int getTargetNodeType() throws Exception + { + if (nodeProps == null || nodeProps.getString(TYPE) == null) + { + // need to query and figure out + return AMQDestination.UNKNOWN_TYPE; + } else if (nodeProps.getString(TYPE).equals("queue")) + { + return AMQDestination.QUEUE_TYPE; + } else if (nodeProps.getString(TYPE).equals("topic")) + { + return AMQDestination.TOPIC_TYPE; + } else + { + throw new Exception("unkown exchange type"); + } + } + + public Node getTargetNode(int addressType) + { + // target node here is the default exchange + if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) + { + return new ExchangeNode(); + } else if (addressType == AMQDestination.TOPIC_TYPE) + { + Map node = (Map) address.getOptions().get(NODE); + return createExchangeNode(node); + } else + { + // don't know yet + return null; + } + } + + private Node createExchangeNode(Map parent) + { + Map declareArgs = getDeclareArgs(parent); + MapAccessor argsMap = new MapAccessor(declareArgs); + ExchangeNode node = new ExchangeNode(); + node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap + .getString(TYPE)); + fillInCommonNodeArgs(node, parent, argsMap); + return node; + } + + private Node createQueueNode(Map parent) + { + Map declareArgs = getDeclareArgs(parent); + MapAccessor argsMap = new MapAccessor(declareArgs); + QueueNode node = new QueueNode(); + node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); + node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false + : argsMap.getBoolean(EXCLUSIVE)); + fillInCommonNodeArgs(node, parent, argsMap); + + return node; + } + + private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap) + { + node.setDurable(getDurability(parent)); + node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false + : argsMap.getBoolean(AUTO_DELETE)); + node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); + node.setBindings(getBindings(parent)); + if (getDeclareArgs(parent).containsKey(ARGUMENTS)) + { + node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS)); + } + } + + private boolean getDurability(Map map) + { + Accessor access = new MapAccessor(map); + Boolean result = access.getBoolean(DURABLE); + return (result == null) ? false : result.booleanValue(); + } + + /** + * if the type == queue x-declare args from the node props is used. if the + * type == exchange x-declare args from the link props is used else just + * create a default temp queue. + */ + public Node getSourceNode(int addressType) + { + if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) + { + return createQueueNode((Map) address.getOptions().get(NODE)); + } + if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) + { + return createQueueNode((Map) address.getOptions().get(LINK)); + } else + { + // need to query the info + return new QueueNode(); + } + } + + public Link getLink() throws Exception + { + Link link = new Link(); + link.setSubscription(new Subscription()); + if (linkProps != null) + { + link.setDurable(linkProps.getBoolean(DURABLE) == null ? false + : linkProps.getBoolean(DURABLE)); + link.setName(linkProps.getString(NAME)); + + String reliability = linkProps.getString(RELIABILITY); + if ( reliability != null) + { + if (reliability.equalsIgnoreCase("unreliable")) + { + link.setReliability(Reliability.UNRELIABLE); + } + else if (reliability.equalsIgnoreCase("at-least-once")) + { + link.setReliability(Reliability.AT_LEAST_ONCE); + } + else + { + throw new Exception("The reliability mode '" + + reliability + "' is not yet supported"); + } + + } + + if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + { + MapAccessor capacityProps = new MapAccessor( + (Map) ((Map) address.getOptions().get(LINK)) + .get(CAPACITY)); + link + .setConsumerCapacity(capacityProps + .getInt(CAPACITY_SOURCE) == null ? 0 + : capacityProps.getInt(CAPACITY_SOURCE)); + link + .setProducerCapacity(capacityProps + .getInt(CAPACITY_TARGET) == null ? 0 + : capacityProps.getInt(CAPACITY_TARGET)); + } + else + { + int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps + .getInt(CAPACITY); + link.setConsumerCapacity(cap); + link.setProducerCapacity(cap); + } + link.setFilter(linkProps.getString(FILTER)); + // so far filter type not used + + if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + { + Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; + + link.getSubscription().setExclusive(exclusive); + } + } + + return link; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java new file mode 100644 index 0000000000..5f97d625b4 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.messaging.address; + +import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.client.messaging.address.Node.QueueNode; + +public class Link +{ + public enum FilterType { SQL92, XQUERY, SUBJECT } + + public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED } + + protected String name; + protected String _filter; + protected FilterType _filterType = FilterType.SUBJECT; + protected boolean _isNoLocal; + protected boolean _isDurable; + protected int _consumerCapacity = 0; + protected int _producerCapacity = 0; + protected Node node; + protected Subscription subscription; + protected Reliability reliability = UNSPECIFIED; + + public Reliability getReliability() + { + return reliability; + } + + public void setReliability(Reliability reliability) + { + this.reliability = reliability; + } + + public Node getNode() + { + return node; + } + + public void setNode(Node node) + { + this.node = node; + } + + public boolean isDurable() + { + return _isDurable; + } + + public void setDurable(boolean durable) + { + _isDurable = durable; + } + + public String getFilter() + { + return _filter; + } + + public void setFilter(String filter) + { + this._filter = filter; + } + + public FilterType getFilterType() + { + return _filterType; + } + + public void setFilterType(FilterType type) + { + _filterType = type; + } + + public boolean isNoLocal() + { + return _isNoLocal; + } + + public void setNoLocal(boolean noLocal) + { + _isNoLocal = noLocal; + } + + public int getConsumerCapacity() + { + return _consumerCapacity; + } + + public void setConsumerCapacity(int capacity) + { + _consumerCapacity = capacity; + } + + public int getProducerCapacity() + { + return _producerCapacity; + } + + public void setProducerCapacity(int capacity) + { + _producerCapacity = capacity; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public Subscription getSubscription() + { + return this.subscription; + } + + public void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + public static class Subscription + { + private Map<String,Object> args = new HashMap<String,Object>(); + private boolean exclusive = false; + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } + + public boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(boolean exclusive) + { + this.exclusive = exclusive; + } + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java new file mode 100644 index 0000000000..c98b194334 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.messaging.address; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.naming.OperationNotSupportedException; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.Binding; + +public abstract class Node +{ + protected int _nodeType = AMQDestination.UNKNOWN_TYPE; + protected boolean _isDurable; + protected boolean _isAutoDelete; + protected String _alternateExchange; + protected List<Binding> _bindings = new ArrayList<Binding>(); + protected Map<String,Object> _declareArgs = Collections.emptyMap(); + + public int getType() + { + return _nodeType; + } + + public boolean isDurable() + { + return _isDurable; + } + + public void setDurable(boolean durable) + { + _isDurable = durable; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + + public List<Binding> getBindings() + { + return _bindings; + } + + public void setBindings(List<Binding> bindings) + { + _bindings = bindings; + } + + public void addBinding(Binding binding) { + this._bindings.add(binding); + } + + public Map<String,Object> getDeclareArgs() + { + return _declareArgs; + } + + public void setDeclareArgs(Map<String,Object> options) + { + _declareArgs = options; + } + + public static class QueueNode extends Node + { + protected boolean _isExclusive; + protected QpidQueueOptions _queueOptions = new QpidQueueOptions(); + + public QueueNode() + { + _nodeType = AMQDestination.QUEUE_TYPE; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + } + + public static class ExchangeNode extends Node + { + protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); + protected String _exchangeType; + + public ExchangeNode() + { + _nodeType = AMQDestination.TOPIC_TYPE; + } + + public String getExchangeType() + { + return _exchangeType; + } + + public void setExchangeType(String exchangeType) + { + _exchangeType = exchangeType; + } + + } + + public static class UnknownNodeType extends Node + { + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidExchangeOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidExchangeOptions.java new file mode 100644 index 0000000000..3ad9aff9ea --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidExchangeOptions.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.messaging.address; + +import java.util.HashMap; + +public class QpidExchangeOptions extends HashMap<String,Object> +{ + public static final String QPID_MSG_SEQUENCE = "qpid.msg_sequence"; + public static final String QPID_INITIAL_VALUE_EXCHANGE = "qpid.ive"; + public static final String QPID_EXCLUSIVE_BINDING = "qpid.exclusive-binding"; + + public void setMessageSequencing() + { + this.put(QPID_MSG_SEQUENCE, 1); + } + + public void setInitialValueExchange() + { + this.put(QPID_INITIAL_VALUE_EXCHANGE, 1); + } + + public void setExclusiveBinding() + { + this.put(QPID_EXCLUSIVE_BINDING, 1); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java new file mode 100644 index 0000000000..04aa7d146f --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.messaging.address; + +import java.util.HashMap; + +public class QpidQueueOptions extends HashMap<String,Object> +{ + public static final String QPID_MAX_COUNT = "qpid.max_count"; + public static final String QPID_MAX_SIZE = "qpid.max_size"; + public static final String QPID_POLICY_TYPE = "qpid.policy_type"; + public static final String QPID_PERSIST_LAST_NODE = "qpid.persist_last_node"; + public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; + public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse"; + public static final String QPID_QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"; + + public void validatePolicyType(String type) + { + if (type == null || + !("reject".equals(type) || "flow_to_disk".equals(type) || + "ring".equals(type) || "ring_strict".equals(type))) + { + throw new IllegalArgumentException("Invalid Queue Policy Type" + + " should be one of {reject|flow_to_disk|ring|ring_strict}"); + } + } + + public void setPolicyType(String s) + { + validatePolicyType(s); + this.put(QPID_POLICY_TYPE, s); + } + + public void setMaxCount(Integer i) + { + this.put(QPID_MAX_COUNT, i); + } + + public void setMaxSize(Integer i) + { + this.put(QPID_MAX_SIZE, i); + } + + public void setPersistLastNode() + { + this.put(QPID_PERSIST_LAST_NODE, 1); + } + + public void setOrderingPolicy(String s) + { + if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE.equals(s)) + { + this.put(QPID_LAST_VALUE_QUEUE, 1); + } + else if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE.equals(s)) + { + this.put(QPID_LAST_VALUE_QUEUE_NO_BROWSE,1); + } + else + { + throw new IllegalArgumentException("Invalid Ordering Policy" + + " should be one of {" + QpidQueueOptions.QPID_LAST_VALUE_QUEUE + "|" + + QPID_LAST_VALUE_QUEUE_NO_BROWSE + "}"); + } + } + + public void setLvqKey(String key) + { + this.put(QPID_LVQ_KEY, key); + } + + public void setQueueEvents(String value) + { + if (value != null && (value.equals("1") || value.equals("2"))) + { + this.put(QPID_QUEUE_EVENT_GENERATION, value); + } + else + { + throw new IllegalArgumentException("Invalid value for " + + QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}"); + } + } +} |