summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/messaging')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java332
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java172
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java148
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidExchangeOptions.java45
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java103
5 files changed, 800 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
new file mode 100644
index 0000000000..368ec60525
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.messaging.address;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Reliability;
+import org.apache.qpid.client.messaging.address.Link.Subscription;
+import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
+import org.apache.qpid.configuration.Accessor;
+import org.apache.qpid.configuration.Accessor.MapAccessor;
+import org.apache.qpid.messaging.Address;
+
+/**
+ * Utility class for extracting information from the address class
+ */
+public class AddressHelper
+{
+ public static final String NODE = "node";
+ public static final String LINK = "link";
+ public static final String X_DECLARE = "x-declare";
+ public static final String X_BINDINGS = "x-bindings";
+ public static final String X_SUBSCRIBE = "x-subscribes";
+ public static final String CREATE = "create";
+ public static final String ASSERT = "assert";
+ public static final String DELETE = "delete";
+ public static final String FILTER = "filter";
+ public static final String NO_LOCAL = "no-local";
+ public static final String DURABLE = "durable";
+ public static final String EXCLUSIVE = "exclusive";
+ public static final String AUTO_DELETE = "auto-delete";
+ public static final String TYPE = "type";
+ public static final String ALT_EXCHANGE = "alternate-exchange";
+ public static final String BINDINGS = "bindings";
+ public static final String BROWSE = "browse";
+ public static final String MODE = "mode";
+ public static final String CAPACITY = "capacity";
+ public static final String CAPACITY_SOURCE = "source";
+ public static final String CAPACITY_TARGET = "target";
+ public static final String NAME = "name";
+ public static final String EXCHANGE = "exchange";
+ public static final String QUEUE = "queue";
+ public static final String KEY = "key";
+ public static final String ARGUMENTS = "arguments";
+ public static final String RELIABILITY = "reliability";
+
+ private Address address;
+ private Accessor addressProps;
+ private Accessor nodeProps;
+ private Accessor linkProps;
+
+ public AddressHelper(Address address)
+ {
+ this.address = address;
+ addressProps = new MapAccessor(address.getOptions());
+ Map node_props = address.getOptions() == null
+ || address.getOptions().get(NODE) == null ? null
+ : (Map) address.getOptions().get(NODE);
+
+ if (node_props != null)
+ {
+ nodeProps = new MapAccessor(node_props);
+ }
+
+ Map link_props = address.getOptions() == null
+ || address.getOptions().get(LINK) == null ? null
+ : (Map) address.getOptions().get(LINK);
+
+ if (link_props != null)
+ {
+ linkProps = new MapAccessor(link_props);
+ }
+ }
+
+ public String getCreate()
+ {
+ return addressProps.getString(CREATE);
+ }
+
+ public String getAssert()
+ {
+ return addressProps.getString(ASSERT);
+ }
+
+ public String getDelete()
+ {
+ return addressProps.getString(DELETE);
+ }
+
+ public boolean isNoLocal()
+ {
+ Boolean b = nodeProps.getBoolean(NO_LOCAL);
+ return b == null ? false : b;
+ }
+
+ public boolean isBrowseOnly()
+ {
+ String mode = addressProps.getString(MODE);
+ return mode != null && mode.equals(BROWSE) ? true : false;
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<Binding> getBindings(Map props)
+ {
+ List<Binding> bindings = new ArrayList<Binding>();
+ List<Map> bindingList = (List<Map>) props.get(X_BINDINGS);
+ if (bindingList != null)
+ {
+ for (Map bindingMap : bindingList)
+ {
+ Binding binding = new Binding(
+ (String) bindingMap.get(EXCHANGE),
+ (String) bindingMap.get(QUEUE),
+ (String) bindingMap.get(KEY),
+ bindingMap.get(ARGUMENTS) == null ? Collections.EMPTY_MAP
+ : (Map<String, Object>) bindingMap
+ .get(ARGUMENTS));
+ bindings.add(binding);
+ }
+ }
+ return bindings;
+ }
+
+ public Map getDeclareArgs(Map props)
+ {
+ if (props != null && props.get(X_DECLARE) != null)
+ {
+ return (Map) props.get(X_DECLARE);
+
+ } else
+ {
+ return Collections.EMPTY_MAP;
+ }
+ }
+
+ public int getTargetNodeType() throws Exception
+ {
+ if (nodeProps == null || nodeProps.getString(TYPE) == null)
+ {
+ // need to query and figure out
+ return AMQDestination.UNKNOWN_TYPE;
+ } else if (nodeProps.getString(TYPE).equals("queue"))
+ {
+ return AMQDestination.QUEUE_TYPE;
+ } else if (nodeProps.getString(TYPE).equals("topic"))
+ {
+ return AMQDestination.TOPIC_TYPE;
+ } else
+ {
+ throw new Exception("unkown exchange type");
+ }
+ }
+
+ public Node getTargetNode(int addressType)
+ {
+ // target node here is the default exchange
+ if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
+ {
+ return new ExchangeNode();
+ } else if (addressType == AMQDestination.TOPIC_TYPE)
+ {
+ Map node = (Map) address.getOptions().get(NODE);
+ return createExchangeNode(node);
+ } else
+ {
+ // don't know yet
+ return null;
+ }
+ }
+
+ private Node createExchangeNode(Map parent)
+ {
+ Map declareArgs = getDeclareArgs(parent);
+ MapAccessor argsMap = new MapAccessor(declareArgs);
+ ExchangeNode node = new ExchangeNode();
+ node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap
+ .getString(TYPE));
+ fillInCommonNodeArgs(node, parent, argsMap);
+ return node;
+ }
+
+ private Node createQueueNode(Map parent)
+ {
+ Map declareArgs = getDeclareArgs(parent);
+ MapAccessor argsMap = new MapAccessor(declareArgs);
+ QueueNode node = new QueueNode();
+ node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
+ node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false
+ : argsMap.getBoolean(EXCLUSIVE));
+ fillInCommonNodeArgs(node, parent, argsMap);
+
+ return node;
+ }
+
+ private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap)
+ {
+ node.setDurable(getDurability(parent));
+ node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false
+ : argsMap.getBoolean(AUTO_DELETE));
+ node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
+ node.setBindings(getBindings(parent));
+ if (getDeclareArgs(parent).containsKey(ARGUMENTS))
+ {
+ node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS));
+ }
+ }
+
+ private boolean getDurability(Map map)
+ {
+ Accessor access = new MapAccessor(map);
+ Boolean result = access.getBoolean(DURABLE);
+ return (result == null) ? false : result.booleanValue();
+ }
+
+ /**
+ * if the type == queue x-declare args from the node props is used. if the
+ * type == exchange x-declare args from the link props is used else just
+ * create a default temp queue.
+ */
+ public Node getSourceNode(int addressType)
+ {
+ if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
+ {
+ return createQueueNode((Map) address.getOptions().get(NODE));
+ }
+ if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
+ {
+ return createQueueNode((Map) address.getOptions().get(LINK));
+ } else
+ {
+ // need to query the info
+ return new QueueNode();
+ }
+ }
+
+ public Link getLink() throws Exception
+ {
+ Link link = new Link();
+ link.setSubscription(new Subscription());
+ if (linkProps != null)
+ {
+ link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
+ : linkProps.getBoolean(DURABLE));
+ link.setName(linkProps.getString(NAME));
+
+ String reliability = linkProps.getString(RELIABILITY);
+ if ( reliability != null)
+ {
+ if (reliability.equalsIgnoreCase("unreliable"))
+ {
+ link.setReliability(Reliability.UNRELIABLE);
+ }
+ else if (reliability.equalsIgnoreCase("at-least-once"))
+ {
+ link.setReliability(Reliability.AT_LEAST_ONCE);
+ }
+ else
+ {
+ throw new Exception("The reliability mode '" +
+ reliability + "' is not yet supported");
+ }
+
+ }
+
+ if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+ {
+ MapAccessor capacityProps = new MapAccessor(
+ (Map) ((Map) address.getOptions().get(LINK))
+ .get(CAPACITY));
+ link
+ .setConsumerCapacity(capacityProps
+ .getInt(CAPACITY_SOURCE) == null ? 0
+ : capacityProps.getInt(CAPACITY_SOURCE));
+ link
+ .setProducerCapacity(capacityProps
+ .getInt(CAPACITY_TARGET) == null ? 0
+ : capacityProps.getInt(CAPACITY_TARGET));
+ }
+ else
+ {
+ int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
+ .getInt(CAPACITY);
+ link.setConsumerCapacity(cap);
+ link.setProducerCapacity(cap);
+ }
+ link.setFilter(linkProps.getString(FILTER));
+ // so far filter type not used
+
+ if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ {
+ Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
+
+ link.getSubscription().setExclusive(exclusive);
+ }
+ }
+
+ return link;
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
new file mode 100644
index 0000000000..5f97d625b4
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.messaging.address;
+
+import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
+
+public class Link
+{
+ public enum FilterType { SQL92, XQUERY, SUBJECT }
+
+ public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
+
+ protected String name;
+ protected String _filter;
+ protected FilterType _filterType = FilterType.SUBJECT;
+ protected boolean _isNoLocal;
+ protected boolean _isDurable;
+ protected int _consumerCapacity = 0;
+ protected int _producerCapacity = 0;
+ protected Node node;
+ protected Subscription subscription;
+ protected Reliability reliability = UNSPECIFIED;
+
+ public Reliability getReliability()
+ {
+ return reliability;
+ }
+
+ public void setReliability(Reliability reliability)
+ {
+ this.reliability = reliability;
+ }
+
+ public Node getNode()
+ {
+ return node;
+ }
+
+ public void setNode(Node node)
+ {
+ this.node = node;
+ }
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public void setDurable(boolean durable)
+ {
+ _isDurable = durable;
+ }
+
+ public String getFilter()
+ {
+ return _filter;
+ }
+
+ public void setFilter(String filter)
+ {
+ this._filter = filter;
+ }
+
+ public FilterType getFilterType()
+ {
+ return _filterType;
+ }
+
+ public void setFilterType(FilterType type)
+ {
+ _filterType = type;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _isNoLocal;
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _isNoLocal = noLocal;
+ }
+
+ public int getConsumerCapacity()
+ {
+ return _consumerCapacity;
+ }
+
+ public void setConsumerCapacity(int capacity)
+ {
+ _consumerCapacity = capacity;
+ }
+
+ public int getProducerCapacity()
+ {
+ return _producerCapacity;
+ }
+
+ public void setProducerCapacity(int capacity)
+ {
+ _producerCapacity = capacity;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public Subscription getSubscription()
+ {
+ return this.subscription;
+ }
+
+ public void setSubscription(Subscription subscription)
+ {
+ this.subscription = subscription;
+ }
+
+ public static class Subscription
+ {
+ private Map<String,Object> args = new HashMap<String,Object>();
+ private boolean exclusive = false;
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args)
+ {
+ this.args = args;
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ this.exclusive = exclusive;
+ }
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
new file mode 100644
index 0000000000..c98b194334
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.messaging.address;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.naming.OperationNotSupportedException;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQDestination.Binding;
+
+public abstract class Node
+{
+ protected int _nodeType = AMQDestination.UNKNOWN_TYPE;
+ protected boolean _isDurable;
+ protected boolean _isAutoDelete;
+ protected String _alternateExchange;
+ protected List<Binding> _bindings = new ArrayList<Binding>();
+ protected Map<String,Object> _declareArgs = Collections.emptyMap();
+
+ public int getType()
+ {
+ return _nodeType;
+ }
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public void setDurable(boolean durable)
+ {
+ _isDurable = durable;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public void setAutoDelete(boolean autoDelete)
+ {
+ _isAutoDelete = autoDelete;
+ }
+
+ public String getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String altExchange)
+ {
+ _alternateExchange = altExchange;
+ }
+
+ public List<Binding> getBindings()
+ {
+ return _bindings;
+ }
+
+ public void setBindings(List<Binding> bindings)
+ {
+ _bindings = bindings;
+ }
+
+ public void addBinding(Binding binding) {
+ this._bindings.add(binding);
+ }
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _declareArgs;
+ }
+
+ public void setDeclareArgs(Map<String,Object> options)
+ {
+ _declareArgs = options;
+ }
+
+ public static class QueueNode extends Node
+ {
+ protected boolean _isExclusive;
+ protected QpidQueueOptions _queueOptions = new QpidQueueOptions();
+
+ public QueueNode()
+ {
+ _nodeType = AMQDestination.QUEUE_TYPE;
+ }
+
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+ }
+
+ public static class ExchangeNode extends Node
+ {
+ protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
+ protected String _exchangeType;
+
+ public ExchangeNode()
+ {
+ _nodeType = AMQDestination.TOPIC_TYPE;
+ }
+
+ public String getExchangeType()
+ {
+ return _exchangeType;
+ }
+
+ public void setExchangeType(String exchangeType)
+ {
+ _exchangeType = exchangeType;
+ }
+
+ }
+
+ public static class UnknownNodeType extends Node
+ {
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidExchangeOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidExchangeOptions.java
new file mode 100644
index 0000000000..3ad9aff9ea
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidExchangeOptions.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.messaging.address;
+
+import java.util.HashMap;
+
+public class QpidExchangeOptions extends HashMap<String,Object>
+{
+ public static final String QPID_MSG_SEQUENCE = "qpid.msg_sequence";
+ public static final String QPID_INITIAL_VALUE_EXCHANGE = "qpid.ive";
+ public static final String QPID_EXCLUSIVE_BINDING = "qpid.exclusive-binding";
+
+ public void setMessageSequencing()
+ {
+ this.put(QPID_MSG_SEQUENCE, 1);
+ }
+
+ public void setInitialValueExchange()
+ {
+ this.put(QPID_INITIAL_VALUE_EXCHANGE, 1);
+ }
+
+ public void setExclusiveBinding()
+ {
+ this.put(QPID_EXCLUSIVE_BINDING, 1);
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
new file mode 100644
index 0000000000..04aa7d146f
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.messaging.address;
+
+import java.util.HashMap;
+
+public class QpidQueueOptions extends HashMap<String,Object>
+{
+ public static final String QPID_MAX_COUNT = "qpid.max_count";
+ public static final String QPID_MAX_SIZE = "qpid.max_size";
+ public static final String QPID_POLICY_TYPE = "qpid.policy_type";
+ public static final String QPID_PERSIST_LAST_NODE = "qpid.persist_last_node";
+ public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
+ public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse";
+ public static final String QPID_QUEUE_EVENT_GENERATION = "qpid.queue_event_generation";
+
+ public void validatePolicyType(String type)
+ {
+ if (type == null ||
+ !("reject".equals(type) || "flow_to_disk".equals(type) ||
+ "ring".equals(type) || "ring_strict".equals(type)))
+ {
+ throw new IllegalArgumentException("Invalid Queue Policy Type" +
+ " should be one of {reject|flow_to_disk|ring|ring_strict}");
+ }
+ }
+
+ public void setPolicyType(String s)
+ {
+ validatePolicyType(s);
+ this.put(QPID_POLICY_TYPE, s);
+ }
+
+ public void setMaxCount(Integer i)
+ {
+ this.put(QPID_MAX_COUNT, i);
+ }
+
+ public void setMaxSize(Integer i)
+ {
+ this.put(QPID_MAX_SIZE, i);
+ }
+
+ public void setPersistLastNode()
+ {
+ this.put(QPID_PERSIST_LAST_NODE, 1);
+ }
+
+ public void setOrderingPolicy(String s)
+ {
+ if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE.equals(s))
+ {
+ this.put(QPID_LAST_VALUE_QUEUE, 1);
+ }
+ else if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE.equals(s))
+ {
+ this.put(QPID_LAST_VALUE_QUEUE_NO_BROWSE,1);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid Ordering Policy" +
+ " should be one of {" + QpidQueueOptions.QPID_LAST_VALUE_QUEUE + "|" +
+ QPID_LAST_VALUE_QUEUE_NO_BROWSE + "}");
+ }
+ }
+
+ public void setLvqKey(String key)
+ {
+ this.put(QPID_LVQ_KEY, key);
+ }
+
+ public void setQueueEvents(String value)
+ {
+ if (value != null && (value.equals("1") || value.equals("2")))
+ {
+ this.put(QPID_QUEUE_EVENT_GENERATION, value);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid value for " +
+ QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}");
+ }
+ }
+}