summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/messaging/address
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/messaging/address')
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java157
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java51
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/Link.java112
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/Node.java59
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java141
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java39
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java108
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java62
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java110
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java253
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java278
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java48
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java86
17 files changed, 1669 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java
new file mode 100644
index 0000000000..c1b2000db0
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.messaging.address;
+
+public class AddressException extends Exception
+{
+ public AddressException(String message)
+ {
+ super(message);
+ }
+
+ public AddressException(String message, Throwable cause)
+ {
+ super(message,cause);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java
new file mode 100644
index 0000000000..7f5b1f9cd1
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import static org.apache.qpid.messaging.address.AddressProperty.*;
+
+import org.apache.qpid.configuration.Accessor;
+import org.apache.qpid.configuration.Accessor.NestedMapAccessor;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.Address.AddressType;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AddressHelper
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AddressHelper.class);
+
+ protected Address address;
+ protected Accessor addressProps;
+
+ public AddressHelper(Address address)
+ {
+ this.address = address;
+ addressProps = new NestedMapAccessor(address.getOptions());
+ }
+
+ public PolicyType getCreate()
+ {
+ return PolicyType.getPolicyType(addressProps.getString(CREATE));
+ }
+
+ public PolicyType getAssert()
+ {
+ return PolicyType.getPolicyType(addressProps.getString(ASSERT));
+ }
+
+ public PolicyType getDelete()
+ {
+ return PolicyType.getPolicyType(addressProps.getString(DELETE));
+ }
+
+ public boolean isBrowseOnly()
+ {
+ String mode = addressProps.getString(MODE);
+ return mode != null && mode.equals(BROWSE) ? true : false;
+ }
+
+ public AddressType getNodeType()
+ {
+ String type = addressProps.getString(getFQN(NODE,TYPE));
+ if ("topic".equalsIgnoreCase(type))
+ {
+ return AddressType.TOPIC_ADDRESS;
+ }
+ else if ("queue".equalsIgnoreCase(type))
+ {
+ return AddressType.QUEUE_ADDRESS;
+ }
+ else
+ {
+ return AddressType.UNSPECIFIED;
+ }
+ }
+
+ public boolean isNodeDurable()
+ {
+ Boolean b = addressProps.getBoolean(getFQN(NODE,DURABLE));
+ return b == null ? false : b.booleanValue();
+ }
+
+ public String getLinkName()
+ {
+ return addressProps.getString(getFQN(LINK,NAME));
+ }
+
+ public boolean isLinkDurable()
+ {
+ Boolean b = addressProps.getBoolean(getFQN(LINK,DURABLE));
+ return b == null ? false : b.booleanValue();
+ }
+
+ public String getLinkReliability()
+ {
+ return addressProps.getString(getFQN(LINK,RELIABILITY));
+ }
+
+ public int getLinkProducerCapacity()
+ {
+ return getCapacity(CheckMode.FOR_SENDER);
+ }
+
+ public int getLinkConsumerCapacity()
+ {
+ return getCapacity(CheckMode.FOR_RECEIVER);
+ }
+
+ private int getCapacity(CheckMode mode)
+ {
+ int capacity = 0;
+ try
+ {
+ capacity = addressProps.getInt(getFQN(LINK,CAPACITY));
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ if (mode == CheckMode.FOR_RECEIVER)
+ {
+ capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_SOURCE));
+ }
+ else
+ {
+ capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_TARGET));
+ }
+ }
+ catch(Exception ex)
+ {
+ if (ex instanceof NumberFormatException && !ex.getMessage().equals("null"))
+ {
+ _logger.info("Unable to retrieve capacity from address: " + address,ex);
+ }
+ }
+ }
+
+ return capacity;
+ }
+
+ public static boolean isAllowed(PolicyType policy, CheckMode mode)
+ {
+ return (policy == PolicyType.ALWAYS ||
+ (policy == PolicyType.RECEIVER && mode == CheckMode.FOR_RECEIVER) ||
+ (policy == PolicyType.SENDER && mode == CheckMode.FOR_SENDER)
+ );
+
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java
new file mode 100644
index 0000000000..26b0f2e676
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+public final class AddressProperty
+{
+ public static final String NODE = "node";
+ public static final String LINK = "link";
+ public static final String CREATE = "create";
+ public static final String ASSERT = "assert";
+ public static final String DELETE = "delete";
+ public static final String FILTER = "filter";
+ public static final String NO_LOCAL = "no-local";
+ public static final String DURABLE = "durable";
+ public static final String TYPE = "type";
+ public static final String BROWSE = "browse";
+ public static final String MODE = "mode";
+ public static final String CAPACITY = "capacity";
+ public static final String CAPACITY_SOURCE = "source";
+ public static final String CAPACITY_TARGET = "target";
+ public static final String NAME = "name";
+ public static final String RELIABILITY = "reliability";
+
+ public static String getFQN(String... propNames)
+ {
+ StringBuilder sb = new StringBuilder();
+ for(String prop: propNames)
+ {
+ sb.append(prop).append("/");
+ }
+ return sb.substring(0, sb.length() -1);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java
new file mode 100644
index 0000000000..d7cf23138b
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Address.AddressType;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.QpidDestination;
+public interface AddressResolver
+{
+ public QpidDestination resolve(Address address) throws AddressException;
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java
new file mode 100644
index 0000000000..55eb86372a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import static org.apache.qpid.messaging.address.Link.Reliability.UNSPECIFIED;
+
+public class Link
+{
+ public enum FilterType { SQL92, XQUERY, SUBJECT }
+
+ public enum Reliability
+ {
+ UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED;
+
+ public static Reliability getReliability(String reliability) throws AddressException
+ {
+ if (reliability == null)
+ {
+ return UNSPECIFIED;
+ }
+ else if (reliability.equalsIgnoreCase("unreliable"))
+ {
+ return UNRELIABLE;
+ }
+ else if (reliability.equalsIgnoreCase("at-least-once"))
+ {
+ return AT_LEAST_ONCE;
+ }
+ else
+ {
+ throw new AddressException("The reliability mode '" +
+ reliability + "' is not yet supported");
+ }
+ }
+ }
+
+ protected String name;
+ protected String filter;
+ protected FilterType filterType = FilterType.SUBJECT;
+ protected boolean noLocal;
+ protected boolean durable;
+ protected int consumerCapacity = 0;
+ protected int producerCapacity = 0;
+ protected Reliability reliability = UNSPECIFIED;
+
+ public Link(AddressHelper helper) throws AddressException
+ {
+ name = helper.getLinkName();
+ durable = helper.isLinkDurable();
+ reliability = Reliability.getReliability(helper.getLinkReliability());
+ consumerCapacity = helper.getLinkConsumerCapacity();
+ producerCapacity = helper.getLinkProducerCapacity();
+ }
+
+ public Reliability getReliability()
+ {
+ return reliability;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public String getFilter()
+ {
+ return filter;
+ }
+
+ public FilterType getFilterType()
+ {
+ return filterType;
+ }
+
+ public boolean isNoLocal()
+ {
+ return noLocal;
+ }
+
+ public int getConsumerCapacity()
+ {
+ return consumerCapacity;
+ }
+
+ public int getProducerCapacity()
+ {
+ return producerCapacity;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java
new file mode 100644
index 0000000000..29c65f0a1d
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import org.apache.qpid.messaging.Address.PolicyType;
+
+public class Node
+{
+ boolean durable = false;
+ PolicyType createPolicy = PolicyType.NEVER;
+ PolicyType assertPolicy = PolicyType.NEVER;
+ PolicyType deletePolicy = PolicyType.NEVER;
+
+ public Node(AddressHelper helper)
+ {
+ durable = helper.isNodeDurable();
+ createPolicy = helper.getCreate();
+ assertPolicy = helper.getAssert();
+ deletePolicy = helper.getDelete();
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public PolicyType getCreatePolicy()
+ {
+ return createPolicy;
+ }
+
+ public PolicyType getAssertPolicy()
+ {
+ return assertPolicy;
+ }
+
+ public PolicyType getDeletePolicy()
+ {
+ return deletePolicy;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java
new file mode 100644
index 0000000000..243b2365e5
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import static org.apache.qpid.messaging.address.AddressProperty.*;
+import static org.apache.qpid.messaging.address.amqp_0_10.AddressProperty_0_10.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.configuration.Accessor;
+import org.apache.qpid.configuration.Accessor.MapAccessor;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.address.AddressHelper;
+
+
+public class AddressHelper_0_10 extends AddressHelper
+{
+ protected Accessor declareProps;
+
+ public AddressHelper_0_10(Address address)
+ {
+ super(address);
+ }
+
+ // Node properties --------------------
+
+ public boolean isNodeExclusive()
+ {
+ Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,EXCLUSIVE));
+ return b == null ? false : b.booleanValue();
+ }
+
+ public boolean isNodeAutoDelete()
+ {
+ Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,AUTO_DELETE));
+ return b == null ? false : b.booleanValue();
+ }
+
+ public String getNodeAltExchange()
+ {
+ return addressProps.getString(getFQN(NODE,X_DECLARE,ALT_EXCHANGE));
+ }
+
+ public Map getNodeDeclareArgs()
+ {
+ Map map = addressProps.getMap(getFQN(NODE,X_DECLARE,ARGUMENTS));
+ return map == null ? Collections.EMPTY_MAP : map;
+ }
+
+ public String getNodeExchangeType()
+ {
+ String type = addressProps.getString(getFQN(NODE,X_DECLARE,EXCHANGE_TYPE));
+ return type == null ? "topic" : type;
+ }
+
+ public List<Binding> getNodeBindings()
+ {
+ return getBindings(addressProps.getList(getFQN(NODE,X_BINDINGS)));
+ }
+
+ // Link properties --------------------
+
+ public List<Binding> getLinkQueueBindings()
+ {
+ return getBindings(addressProps.getList(getFQN(LINK,X_BINDINGS)));
+ }
+
+ public Map getLinkQueueDeclareArgs()
+ {
+ Map map = addressProps.getMap(getFQN(LINK,X_DECLARE,ARGUMENTS));
+ return map == null ? Collections.EMPTY_MAP : map;
+ }
+
+ public Boolean isLinkQueueExclusive()
+ {
+ return addressProps.getBoolean(getFQN(LINK,X_DECLARE,EXCLUSIVE));
+ }
+
+ public Boolean isLinkQueueAutoDelete()
+ {
+ return addressProps.getBoolean(getFQN(LINK,X_DECLARE,AUTO_DELETE));
+ }
+
+ public String getLinkQueueAltExchange()
+ {
+ return addressProps.getString(getFQN(LINK,X_DECLARE,ALT_EXCHANGE));
+ }
+
+ public Boolean isSubscriptionExclusive()
+ {
+ return addressProps.getBoolean(getFQN(LINK,X_SUBSCRIBE,EXCLUSIVE));
+ }
+
+ public Map getSubscriptionArguments()
+ {
+ Map m = addressProps.getMap(getFQN(LINK,X_SUBSCRIBE,ARGUMENTS));
+ return m == null ? Collections.EMPTY_MAP : m;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Binding> getBindings(List<Map> bindingList)
+ {
+ List<Binding> bindings = new ArrayList<Binding>();
+ if (bindingList != null)
+ {
+ for (Map map : bindingList)
+ {
+ MapAccessor bindingMap = new MapAccessor(map);
+ Binding binding = new Binding(
+ bindingMap.getString(EXCHANGE),
+ bindingMap.getString(QUEUE),
+ bindingMap.getString(KEY),
+ bindingMap.getMap(ARGUMENTS) == null ? Collections.EMPTY_MAP
+ : bindingMap.getMap(ARGUMENTS));
+ bindings.add(binding);
+ }
+ }
+ return bindings;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java
new file mode 100644
index 0000000000..8d46c29d4e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+public final class AddressProperty_0_10
+{
+ public static final String X_DECLARE = "x-declare";
+ public static final String X_BINDINGS = "x-bindings";
+ public static final String X_SUBSCRIBE = "x-subscribes";
+
+ public static final String EXCLUSIVE = "exclusive";
+ public static final String AUTO_DELETE = "auto-delete";
+ public static final String ALT_EXCHANGE = "alternate-exchange";
+ public static final String EXCHANGE_TYPE = "type";
+
+ public static final String BINDINGS = "bindings";
+ public static final String EXCHANGE = "exchange";
+ public static final String QUEUE = "queue";
+ public static final String KEY = "key";
+ public static final String ARGUMENTS = "arguments";
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java
new file mode 100644
index 0000000000..acd5fbb4d4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Address.AddressType;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.QpidDestination;
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.AddressResolver;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.Session;
+
+public class AddressResolver_0_10 implements AddressResolver
+{
+ Session ssn;
+
+ public AddressResolver_0_10(Session session)
+ {
+ this.ssn = session;
+ }
+
+ /**
+ * 1. Check if it's an exchange (topic) or a queue
+ * 2. Create a QpidTopic or a QpidQueue based on that
+ */
+ public QpidDestination resolve(Address address) throws AddressException
+ {
+ AddressHelper_0_10 helper = new AddressHelper_0_10(address);
+
+ if (!address.isResolved())
+ {
+ checkAddressType(address,helper);
+ address.setResolved(true);
+ }
+
+ Link_0_10 link = new Link_0_10(helper);
+ Node_0_10 node = (address.getAddressType() == AddressType.TOPIC_ADDRESS) ?
+ new ExchangeNode(helper) : new QueueNode(helper);
+
+ try
+ {
+ QpidDestination dest = (QpidDestination) ((address.getAddressType() == AddressType.TOPIC_ADDRESS) ?
+ new QpidTopic_0_10(address,(ExchangeNode)node,link):
+ new QpidQueue_0_10(address,(QueueNode)node,link));
+
+ return dest;
+ }
+ catch(Exception e)
+ {
+ throw new AddressException("Error creating destination impl",e);
+ }
+ }
+
+ private void checkAddressType(Address address,AddressHelper_0_10 helper) throws AddressException
+ {
+ ExchangeBoundResult result = ssn.exchangeBound(address.getName(),address.getName(),
+ null,null).get();
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name;
+ //treat it as a queue unless a type is specified.
+ if (helper.getNodeType() == AddressType.UNSPECIFIED)
+ {
+ address.setAddressType(AddressType.QUEUE_ADDRESS);
+ }
+ else
+ {
+ address.setAddressType(AddressType.TOPIC_ADDRESS);
+ }
+ }
+ else if (result.getExchangeNotFound())
+ {
+ //name refers to a queue
+ address.setAddressType(AddressType.QUEUE_ADDRESS);
+ }
+ else if (result.getQueueNotFound())
+ {
+ //name refers to an exchange
+ address.setAddressType(AddressType.TOPIC_ADDRESS);
+ }
+ else
+ {
+ //both a queue and exchange exist for that name
+ if (helper.getNodeType() == AddressType.UNSPECIFIED)
+ {
+ throw new AddressException("Ambiguous address, please specify a node type. Ex type:{queue|topic}");
+ }
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java
new file mode 100644
index 0000000000..46c789a4e7
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+public class Binding
+{
+ String exchange;
+ String bindingKey;
+ String queue;
+ Map<String,Object> args;
+
+ public Binding(String exchange,
+ String queue,
+ String bindingKey,
+ Map<String,Object> args)
+ {
+ this.exchange = exchange;
+ this.queue = queue;
+ this.bindingKey = bindingKey;
+ this.args = args;
+ }
+
+ public String getExchange()
+ {
+ return exchange;
+ }
+
+ public String getQueue()
+ {
+ return queue;
+ }
+
+ public String getBindingKey()
+ {
+ return bindingKey;
+ }
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java
new file mode 100644
index 0000000000..633fd3ed24
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+
+public class ExchangeNode extends Node_0_10
+{
+ private String exchangeType;
+
+ public ExchangeNode(AddressHelper_0_10 helper)
+ {
+ super(helper);
+ exchangeType = helper.getNodeExchangeType();
+ }
+
+ public String getExchangeType()
+ {
+ return exchangeType;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java
new file mode 100644
index 0000000000..c8df601c48
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.Link;
+
+public class Link_0_10 extends Link
+{
+ private Subscription subscription;
+ private List<Binding> bindings;
+
+ private Boolean autoDelete = null;
+ private String altExchange;
+ private Boolean exclusive = null;
+ private Map<String,Object> declareArgs;
+
+ @SuppressWarnings("unchecked")
+ public Link_0_10(AddressHelper_0_10 helper) throws AddressException
+ {
+ super(helper);
+ autoDelete = helper.isLinkQueueAutoDelete();
+ exclusive = helper.isLinkQueueExclusive();
+ altExchange = helper.getLinkQueueAltExchange();
+ declareArgs = helper.getLinkQueueDeclareArgs();
+ bindings = helper.getLinkQueueBindings();
+ subscription = new Subscription();
+ subscription.setExclusive(helper.isSubscriptionExclusive());
+ subscription.setArgs(helper.getSubscriptionArguments());
+ }
+
+ public List<Binding> getQueueBindings()
+ {
+ return bindings;
+ }
+
+ public Boolean isQueueAutoDelete()
+ {
+ return autoDelete;
+ }
+
+ public String getQueueAltExchange()
+ {
+ return altExchange;
+ }
+
+ public Boolean isQueueExclusive()
+ {
+ return exclusive;
+ }
+
+ public Map<String, Object> getQueueDeclareArgs()
+ {
+ return declareArgs;
+ }
+
+
+ public Subscription getSubscription()
+ {
+ return this.subscription;
+ }
+
+ public static class Subscription
+ {
+ private Map<String,Object> args = new HashMap<String,Object>();
+ private Boolean exclusive = null;
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args)
+ {
+ this.args = args;
+ }
+
+ public Boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public void setExclusive(Boolean exclusive)
+ {
+ this.exclusive = exclusive;
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java
new file mode 100644
index 0000000000..f97b8c11b4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+import org.apache.qpid.messaging.address.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Node_0_10 extends Node
+{
+ private static final Logger _logger = LoggerFactory.getLogger(Node_0_10.class);
+
+ private boolean autoDelete = false;
+ private String altExchange;
+ private Map<String,Object> declareArgs;
+
+ public Node_0_10(AddressHelper_0_10 helper)
+ {
+ super(helper);
+ declareArgs = helper.getNodeDeclareArgs();
+ autoDelete = helper.isNodeAutoDelete();
+ altExchange = helper.getNodeAltExchange();
+ }
+
+ public boolean isAutoDelete()
+ {
+ return autoDelete;
+ }
+
+ public String getAltExchange()
+ {
+ return altExchange;
+ }
+
+ public Map<String, Object> getDeclareArgs()
+ {
+ return declareArgs;
+ }
+
+ public boolean matchProps(Map<String,Object> target)
+ {
+ boolean match = true;
+ Map<String,Object> source = declareArgs;
+ for (String key: source.keySet())
+ {
+ match = target.containsKey(key) &&
+ target.get(key).equals(source.get(key));
+
+ if (!match)
+ {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Property given in address did not match with the args sent by the broker.");
+ buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
+ buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
+ _logger.debug(buf.toString());
+ return match;
+ }
+ }
+
+ return match;
+ }
+} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java
new file mode 100644
index 0000000000..4c2f94ae59
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SubscriptionSettings;
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.AddressHelper;
+import org.apache.qpid.messaging.address.Link;
+import org.apache.qpid.messaging.address.Link.Reliability;
+import org.apache.qpid.messaging.amqp_0_10.Session_0_10;
+import org.apache.qpid.messaging.QpidQueue;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.SessionException;
+
+public class QpidQueue_0_10 extends QpidQueue
+{
+ private Address address;
+ private QueueNode queue;
+ private Link_0_10 link;
+
+ public QpidQueue_0_10(Address address,QueueNode queue,Link_0_10 link) throws Exception
+ {
+ this.address = address;
+ this.queue = queue;
+ this.link = link;
+ queueName = address.getName();
+ }
+
+ @Override
+ public void checkCreate(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(queue.getCreatePolicy(), mode))
+ {
+ ssn.queueDeclare(queueName,
+ queue.getAltExchange(),
+ queue.getDeclareArgs(),
+ queue.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ queue.isDurable() ? Option.DURABLE : Option.NONE,
+ queue.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+ if (queue.getBindings().size() > 0)
+ {
+ for (Binding binding: queue.getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ queueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ "anq.topic" :
+ binding.getExchange();
+
+ ssn.exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ ssn.queueDeclare(queueName, null, null,Option.PASSIVE);
+ ssn.sync();
+ }
+ catch(SessionException e)
+ {
+ if (e.getException() != null
+ && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
+ {
+ throw new AddressException("The Queue '" + queueName +"' does not exist",e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void checkAssert(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(queue.getAssertPolicy(), mode))
+ {
+ boolean match = false;
+ try
+ {
+ QueueQueryResult result = ssn.queueQuery(queueName, Option.NONE).get();
+
+ match = queueName.equals(result.getQueue()) &&
+ (result.getDurable() == queue.isDurable()) &&
+ (result.getAutoDelete() == queue.isAutoDelete()) &&
+ (result.getExclusive() == queue.isExclusive()) &&
+ (queue.matchProps(result.getArguments()));
+ }
+ catch(SessionException e)
+ {
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ {
+ match = false;
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
+ "Error querying queue",e);
+ }
+ }
+ if (!match)
+ {
+ throw new AddressException("The queue described in the address does not exist on the broker");
+ }
+ }
+ }
+
+ @Override
+ public void checkDelete(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(queue.getDeletePolicy(), mode))
+ {
+ ssn.queueDelete(queueName, Option.NONE);
+ ssn.sync();
+ }
+ }
+
+ @Override
+ public void createSubscription(Session session,SubscriptionSettings settings) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (link.getQueueBindings().size() > 0)
+ {
+ for (Binding binding: link.getQueueBindings())
+ {
+ String queue = binding.getQueue() == null?
+ queueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ "amq.topic" :
+ binding.getExchange();
+
+ ssn.exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
+ }
+
+ SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings;
+ Map<String,Object> arguments = settings_0_10.getArgs();
+ arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs());
+
+ if (link.getReliability() == Reliability.UNRELIABLE ||
+ link.getReliability() == Reliability.AT_MOST_ONCE)
+ {
+ settings_0_10.setAcceptMode(MessageAcceptMode.NONE);
+ }
+
+ // for queues subscriptions are non exclusive by default.
+ boolean exclusive =
+ (link.getSubscription().isExclusive() == null) ? false : link.getSubscription().isExclusive();
+
+ ssn.messageSubscribe(queueName,
+ settings_0_10.getSubscriptionTag(),
+ settings_0_10.getAcceptMode(),
+ settings_0_10.getAccquireMode(),
+ null, // resume id
+ 0, // resume ttl
+ arguments,
+ exclusive ? Option.EXCLUSIVE : Option.NONE);
+ }
+
+ @Override
+ public void deleteSubscription(Session session) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (link.getQueueBindings().size() > 0)
+ {
+ for (Binding binding: link.getQueueBindings())
+ {
+ String queue = binding.getQueue() == null?
+ queueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ "amq.topic" :
+ binding.getExchange();
+
+ ssn.exchangeUnbind(queue,
+ exchange,
+ binding.getBindingKey(),
+ Option.NONE);
+ }
+ }
+
+ // ideally we should cancel the subscription here
+ }
+
+ public String getSubscriptionQueue()
+ {
+ return queueName;
+ }
+
+ public long getConsumerCapacity()
+ {
+ return link.getConsumerCapacity();
+ }
+
+ public long getProducerCapacity()
+ {
+ return link.getProducerCapacity();
+ }
+
+ public String toString()
+ {
+ return address.toString();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java
new file mode 100644
index 0000000000..b9195220d4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SubscriptionSettings;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.messaging.QpidTopic;
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.AddressHelper;
+import org.apache.qpid.messaging.address.Link;
+import org.apache.qpid.messaging.address.Link.Reliability;
+import org.apache.qpid.messaging.amqp_0_10.Session_0_10;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.SessionException;
+
+public class QpidTopic_0_10 extends QpidTopic
+{
+ private String exchangeName;
+ private Session ssn;
+ private Address address;
+ private ExchangeNode exchange;
+ private Link_0_10 link;
+ private String subscriptionQueue;
+
+ public QpidTopic_0_10(Address address,ExchangeNode exchange,Link_0_10 link) throws Exception
+ {
+ if (Reliability.AT_LEAST_ONCE == link.getReliability())
+ {
+ throw new Exception("AT-LEAST-ONCE is not yet supported for Topics");
+ }
+
+ this.address = address;
+ this.exchange = exchange;
+ this.link = link;
+ this.exchangeName = address.getName();
+ topicName = retrieveTopicName();
+ }
+
+ @Override
+ public void checkCreate(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(exchange.getCreatePolicy(), mode))
+ {
+ ssn.exchangeDeclare(exchangeName,
+ exchange.getExchangeType(),
+ exchange.getAltExchange(),
+ exchange.getDeclareArgs(),
+ exchangeName.toString().startsWith("amq.") ?
+ Option.PASSIVE : Option.NONE);
+ }
+ else
+ {
+ try
+ {
+ ssn.exchangeDeclare(exchangeName, null, null, null, Option.PASSIVE);
+ ssn.sync();
+ }
+ catch(SessionException e)
+ {
+ if (e.getException() != null
+ && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
+ {
+ throw new AddressException("The exchange '" + exchangeName +"' does not exist",e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void checkAssert(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(exchange.getAssertPolicy(), mode))
+ {
+ ExchangeQueryResult result = ssn.exchangeQuery(exchangeName, Option.NONE).get();
+ boolean match = !result.getNotFound() &&
+ (result.getDurable() == exchange.isDurable()) &&
+ (exchange.getExchangeType() != null && exchange.getExchangeType().equals(result.getType())) &&
+ (exchange.matchProps(result.getArguments()));
+
+ if (!match)
+ {
+ throw new AddressException("The exchange described by the address does not exist on the broker");
+ }
+ }
+ }
+
+ @Override
+ public void checkDelete(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(exchange.getDeletePolicy(), mode) &&
+ !exchangeName.toString().startsWith("amq."))
+ {
+ ssn.exchangeDelete(exchangeName, Option.NONE);
+ ssn.sync();
+ }
+ }
+
+ @Override
+ public void createSubscription(Session session,SubscriptionSettings settings) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+ subscriptionQueue = getSubscriptionQueueName();
+
+ // for topics subscriptions queues are exclusive by default.
+ boolean exclusive =
+ (link.isQueueExclusive() == null) ? true : link.getSubscription().isExclusive();
+
+ // for topics subscriptions are autoDelete by default.
+ boolean autoDelete =
+ (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
+
+ ssn.queueDeclare(subscriptionQueue,
+ link.getQueueAltExchange(),
+ link.getQueueDeclareArgs(),
+ autoDelete ? Option.AUTO_DELETE : Option.NONE,
+ link.isDurable() ? Option.DURABLE : Option.NONE,
+ exclusive ? Option.EXCLUSIVE : Option.NONE);
+
+ if (link.getQueueBindings().size() > 0)
+ {
+ for (Binding binding: link.getQueueBindings())
+ {
+ String queue = binding.getQueue() == null?
+ subscriptionQueue: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ address.getName() :
+ binding.getExchange();
+
+ ssn.exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
+ }
+ else
+ {
+ String subject = address.getSubject();
+ ssn.exchangeBind(subscriptionQueue,
+ address.getName(),
+ subject == null || subject.trim().equals("") ? "#" : subject,
+ null);
+ }
+
+ SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings;
+ Map<String,Object> arguments = settings_0_10.getArgs();
+ arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs());
+
+ if (link.getReliability() == Reliability.UNRELIABLE ||
+ link.getReliability() == Reliability.AT_MOST_ONCE)
+ {
+ settings_0_10.setAcceptMode(MessageAcceptMode.NONE);
+ }
+
+ // for topics subscriptions are exclusive by default.
+ boolean exclusiveConsume =
+ (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
+
+ ssn.messageSubscribe(subscriptionQueue,
+ settings_0_10.getSubscriptionTag(),
+ settings_0_10.getAcceptMode(),
+ settings_0_10.getAccquireMode(),
+ null, // resume id
+ 0, // resume ttl
+ arguments,
+ exclusiveConsume ? Option.EXCLUSIVE : Option.NONE);
+ }
+
+ @Override
+ public void deleteSubscription(Session session) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (link.getQueueBindings().size() > 0)
+ {
+ for (Binding binding: link.getQueueBindings())
+ {
+ String queue = binding.getQueue() == null?
+ subscriptionQueue: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ "anq.topic" :
+ binding.getExchange();
+
+ ssn.exchangeUnbind(queue,
+ exchange,
+ binding.getBindingKey(),
+ Option.NONE);
+ }
+ }
+ ssn.queueDelete(subscriptionQueue, Option.NONE);
+ }
+
+ private String getSubscriptionQueueName()
+ {
+ if (link.getName() == null)
+ {
+ return "TempQueue" + UUID.randomUUID();
+ }
+ else
+ {
+ return link.getName();
+ }
+ }
+
+ private String retrieveTopicName()
+ {
+ if (address.getSubject() != null && !address.getSubject().trim().equals(""))
+ {
+ return address.getSubject();
+ }
+ else if ("topic".equals(exchange.getExchangeType()))
+ {
+ return "#";
+ }
+ else
+ {
+ return "";
+ }
+ }
+
+ public String getSubscriptionQueue()
+ {
+ return subscriptionQueue;
+ }
+
+ public long getConsumerCapacity()
+ {
+ return link.getConsumerCapacity();
+ }
+
+ public long getProducerCapacity()
+ {
+ return link.getProducerCapacity();
+ }
+
+ public String toString()
+ {
+ return address.toString();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java
new file mode 100644
index 0000000000..04f0911f44
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.List;
+
+import org.apache.qpid.messaging.Address;
+
+public class QueueNode extends Node_0_10
+{
+ private boolean exclusive = false;
+ private List<Binding> bindings;
+
+ public QueueNode(AddressHelper_0_10 helper)
+ {
+ super(helper);
+ exclusive = helper.isNodeExclusive();
+ bindings = helper.getNodeBindings();
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public List<Binding> getBindings()
+ {
+ return bindings;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java
new file mode 100644
index 0000000000..cbb51c637b
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+import org.apache.qpid.messaging.SubscriptionSettings;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+
+public class SubscriptionSettings_0_10 implements SubscriptionSettings
+{
+ String messageSelector;
+ String subscriptionTag;
+ MessageAcceptMode acceptMode;
+ MessageAcquireMode accquireMode;
+ Map<String,Object> args;
+
+ public String getMessageSelector()
+ {
+ return messageSelector;
+ }
+
+ public void setMessageSelector(String messageSelector)
+ {
+ this.messageSelector = messageSelector;
+ }
+
+ public String getSubscriptionTag()
+ {
+ return subscriptionTag;
+ }
+
+ public void setSubscriptionTag(String subscriptionTag)
+ {
+ this.subscriptionTag = subscriptionTag;
+ }
+
+ public MessageAcceptMode getAcceptMode()
+ {
+ return acceptMode;
+ }
+
+ public void setAcceptMode(MessageAcceptMode acceptMode)
+ {
+ this.acceptMode = acceptMode;
+ }
+
+ public MessageAcquireMode getAccquireMode()
+ {
+ return accquireMode;
+ }
+
+ public void setAccquireMode(MessageAcquireMode accquireMode)
+ {
+ this.accquireMode = accquireMode;
+ }
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args)
+ {
+ this.args = args;
+ }
+}