diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/messaging/address')
17 files changed, 1669 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java new file mode 100644 index 0000000000..c1b2000db0 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java @@ -0,0 +1,14 @@ +package org.apache.qpid.messaging.address; + +public class AddressException extends Exception +{ + public AddressException(String message) + { + super(message); + } + + public AddressException(String message, Throwable cause) + { + super(message,cause); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java new file mode 100644 index 0000000000..7f5b1f9cd1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import static org.apache.qpid.messaging.address.AddressProperty.*; + +import org.apache.qpid.configuration.Accessor; +import org.apache.qpid.configuration.Accessor.NestedMapAccessor; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.Address.AddressType; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AddressHelper +{ + private static final Logger _logger = LoggerFactory.getLogger(AddressHelper.class); + + protected Address address; + protected Accessor addressProps; + + public AddressHelper(Address address) + { + this.address = address; + addressProps = new NestedMapAccessor(address.getOptions()); + } + + public PolicyType getCreate() + { + return PolicyType.getPolicyType(addressProps.getString(CREATE)); + } + + public PolicyType getAssert() + { + return PolicyType.getPolicyType(addressProps.getString(ASSERT)); + } + + public PolicyType getDelete() + { + return PolicyType.getPolicyType(addressProps.getString(DELETE)); + } + + public boolean isBrowseOnly() + { + String mode = addressProps.getString(MODE); + return mode != null && mode.equals(BROWSE) ? true : false; + } + + public AddressType getNodeType() + { + String type = addressProps.getString(getFQN(NODE,TYPE)); + if ("topic".equalsIgnoreCase(type)) + { + return AddressType.TOPIC_ADDRESS; + } + else if ("queue".equalsIgnoreCase(type)) + { + return AddressType.QUEUE_ADDRESS; + } + else + { + return AddressType.UNSPECIFIED; + } + } + + public boolean isNodeDurable() + { + Boolean b = addressProps.getBoolean(getFQN(NODE,DURABLE)); + return b == null ? false : b.booleanValue(); + } + + public String getLinkName() + { + return addressProps.getString(getFQN(LINK,NAME)); + } + + public boolean isLinkDurable() + { + Boolean b = addressProps.getBoolean(getFQN(LINK,DURABLE)); + return b == null ? false : b.booleanValue(); + } + + public String getLinkReliability() + { + return addressProps.getString(getFQN(LINK,RELIABILITY)); + } + + public int getLinkProducerCapacity() + { + return getCapacity(CheckMode.FOR_SENDER); + } + + public int getLinkConsumerCapacity() + { + return getCapacity(CheckMode.FOR_RECEIVER); + } + + private int getCapacity(CheckMode mode) + { + int capacity = 0; + try + { + capacity = addressProps.getInt(getFQN(LINK,CAPACITY)); + } + catch(Exception e) + { + try + { + if (mode == CheckMode.FOR_RECEIVER) + { + capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_SOURCE)); + } + else + { + capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_TARGET)); + } + } + catch(Exception ex) + { + if (ex instanceof NumberFormatException && !ex.getMessage().equals("null")) + { + _logger.info("Unable to retrieve capacity from address: " + address,ex); + } + } + } + + return capacity; + } + + public static boolean isAllowed(PolicyType policy, CheckMode mode) + { + return (policy == PolicyType.ALWAYS || + (policy == PolicyType.RECEIVER && mode == CheckMode.FOR_RECEIVER) || + (policy == PolicyType.SENDER && mode == CheckMode.FOR_SENDER) + ); + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java new file mode 100644 index 0000000000..26b0f2e676 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +public final class AddressProperty +{ + public static final String NODE = "node"; + public static final String LINK = "link"; + public static final String CREATE = "create"; + public static final String ASSERT = "assert"; + public static final String DELETE = "delete"; + public static final String FILTER = "filter"; + public static final String NO_LOCAL = "no-local"; + public static final String DURABLE = "durable"; + public static final String TYPE = "type"; + public static final String BROWSE = "browse"; + public static final String MODE = "mode"; + public static final String CAPACITY = "capacity"; + public static final String CAPACITY_SOURCE = "source"; + public static final String CAPACITY_TARGET = "target"; + public static final String NAME = "name"; + public static final String RELIABILITY = "reliability"; + + public static String getFQN(String... propNames) + { + StringBuilder sb = new StringBuilder(); + for(String prop: propNames) + { + sb.append(prop).append("/"); + } + return sb.substring(0, sb.length() -1); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java new file mode 100644 index 0000000000..d7cf23138b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Address.AddressType; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.QpidDestination; +public interface AddressResolver +{ + public QpidDestination resolve(Address address) throws AddressException; + +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java new file mode 100644 index 0000000000..55eb86372a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import static org.apache.qpid.messaging.address.Link.Reliability.UNSPECIFIED; + +public class Link +{ + public enum FilterType { SQL92, XQUERY, SUBJECT } + + public enum Reliability + { + UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED; + + public static Reliability getReliability(String reliability) throws AddressException + { + if (reliability == null) + { + return UNSPECIFIED; + } + else if (reliability.equalsIgnoreCase("unreliable")) + { + return UNRELIABLE; + } + else if (reliability.equalsIgnoreCase("at-least-once")) + { + return AT_LEAST_ONCE; + } + else + { + throw new AddressException("The reliability mode '" + + reliability + "' is not yet supported"); + } + } + } + + protected String name; + protected String filter; + protected FilterType filterType = FilterType.SUBJECT; + protected boolean noLocal; + protected boolean durable; + protected int consumerCapacity = 0; + protected int producerCapacity = 0; + protected Reliability reliability = UNSPECIFIED; + + public Link(AddressHelper helper) throws AddressException + { + name = helper.getLinkName(); + durable = helper.isLinkDurable(); + reliability = Reliability.getReliability(helper.getLinkReliability()); + consumerCapacity = helper.getLinkConsumerCapacity(); + producerCapacity = helper.getLinkProducerCapacity(); + } + + public Reliability getReliability() + { + return reliability; + } + + public boolean isDurable() + { + return durable; + } + + public String getFilter() + { + return filter; + } + + public FilterType getFilterType() + { + return filterType; + } + + public boolean isNoLocal() + { + return noLocal; + } + + public int getConsumerCapacity() + { + return consumerCapacity; + } + + public int getProducerCapacity() + { + return producerCapacity; + } + + public String getName() + { + return name; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java new file mode 100644 index 0000000000..29c65f0a1d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address; + +import org.apache.qpid.messaging.Address.PolicyType; + +public class Node +{ + boolean durable = false; + PolicyType createPolicy = PolicyType.NEVER; + PolicyType assertPolicy = PolicyType.NEVER; + PolicyType deletePolicy = PolicyType.NEVER; + + public Node(AddressHelper helper) + { + durable = helper.isNodeDurable(); + createPolicy = helper.getCreate(); + assertPolicy = helper.getAssert(); + deletePolicy = helper.getDelete(); + } + + public boolean isDurable() + { + return durable; + } + + public PolicyType getCreatePolicy() + { + return createPolicy; + } + + public PolicyType getAssertPolicy() + { + return assertPolicy; + } + + public PolicyType getDeletePolicy() + { + return deletePolicy; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java new file mode 100644 index 0000000000..243b2365e5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import static org.apache.qpid.messaging.address.AddressProperty.*; +import static org.apache.qpid.messaging.address.amqp_0_10.AddressProperty_0_10.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.configuration.Accessor; +import org.apache.qpid.configuration.Accessor.MapAccessor; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.address.AddressHelper; + + +public class AddressHelper_0_10 extends AddressHelper +{ + protected Accessor declareProps; + + public AddressHelper_0_10(Address address) + { + super(address); + } + + // Node properties -------------------- + + public boolean isNodeExclusive() + { + Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,EXCLUSIVE)); + return b == null ? false : b.booleanValue(); + } + + public boolean isNodeAutoDelete() + { + Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,AUTO_DELETE)); + return b == null ? false : b.booleanValue(); + } + + public String getNodeAltExchange() + { + return addressProps.getString(getFQN(NODE,X_DECLARE,ALT_EXCHANGE)); + } + + public Map getNodeDeclareArgs() + { + Map map = addressProps.getMap(getFQN(NODE,X_DECLARE,ARGUMENTS)); + return map == null ? Collections.EMPTY_MAP : map; + } + + public String getNodeExchangeType() + { + String type = addressProps.getString(getFQN(NODE,X_DECLARE,EXCHANGE_TYPE)); + return type == null ? "topic" : type; + } + + public List<Binding> getNodeBindings() + { + return getBindings(addressProps.getList(getFQN(NODE,X_BINDINGS))); + } + + // Link properties -------------------- + + public List<Binding> getLinkQueueBindings() + { + return getBindings(addressProps.getList(getFQN(LINK,X_BINDINGS))); + } + + public Map getLinkQueueDeclareArgs() + { + Map map = addressProps.getMap(getFQN(LINK,X_DECLARE,ARGUMENTS)); + return map == null ? Collections.EMPTY_MAP : map; + } + + public Boolean isLinkQueueExclusive() + { + return addressProps.getBoolean(getFQN(LINK,X_DECLARE,EXCLUSIVE)); + } + + public Boolean isLinkQueueAutoDelete() + { + return addressProps.getBoolean(getFQN(LINK,X_DECLARE,AUTO_DELETE)); + } + + public String getLinkQueueAltExchange() + { + return addressProps.getString(getFQN(LINK,X_DECLARE,ALT_EXCHANGE)); + } + + public Boolean isSubscriptionExclusive() + { + return addressProps.getBoolean(getFQN(LINK,X_SUBSCRIBE,EXCLUSIVE)); + } + + public Map getSubscriptionArguments() + { + Map m = addressProps.getMap(getFQN(LINK,X_SUBSCRIBE,ARGUMENTS)); + return m == null ? Collections.EMPTY_MAP : m; + } + + @SuppressWarnings("unchecked") + private List<Binding> getBindings(List<Map> bindingList) + { + List<Binding> bindings = new ArrayList<Binding>(); + if (bindingList != null) + { + for (Map map : bindingList) + { + MapAccessor bindingMap = new MapAccessor(map); + Binding binding = new Binding( + bindingMap.getString(EXCHANGE), + bindingMap.getString(QUEUE), + bindingMap.getString(KEY), + bindingMap.getMap(ARGUMENTS) == null ? Collections.EMPTY_MAP + : bindingMap.getMap(ARGUMENTS)); + bindings.add(binding); + } + } + return bindings; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java new file mode 100644 index 0000000000..8d46c29d4e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +public final class AddressProperty_0_10 +{ + public static final String X_DECLARE = "x-declare"; + public static final String X_BINDINGS = "x-bindings"; + public static final String X_SUBSCRIBE = "x-subscribes"; + + public static final String EXCLUSIVE = "exclusive"; + public static final String AUTO_DELETE = "auto-delete"; + public static final String ALT_EXCHANGE = "alternate-exchange"; + public static final String EXCHANGE_TYPE = "type"; + + public static final String BINDINGS = "bindings"; + public static final String EXCHANGE = "exchange"; + public static final String QUEUE = "queue"; + public static final String KEY = "key"; + public static final String ARGUMENTS = "arguments"; +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java new file mode 100644 index 0000000000..acd5fbb4d4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Address.AddressType; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.QpidDestination; +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.AddressResolver; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.Session; + +public class AddressResolver_0_10 implements AddressResolver +{ + Session ssn; + + public AddressResolver_0_10(Session session) + { + this.ssn = session; + } + + /** + * 1. Check if it's an exchange (topic) or a queue + * 2. Create a QpidTopic or a QpidQueue based on that + */ + public QpidDestination resolve(Address address) throws AddressException + { + AddressHelper_0_10 helper = new AddressHelper_0_10(address); + + if (!address.isResolved()) + { + checkAddressType(address,helper); + address.setResolved(true); + } + + Link_0_10 link = new Link_0_10(helper); + Node_0_10 node = (address.getAddressType() == AddressType.TOPIC_ADDRESS) ? + new ExchangeNode(helper) : new QueueNode(helper); + + try + { + QpidDestination dest = (QpidDestination) ((address.getAddressType() == AddressType.TOPIC_ADDRESS) ? + new QpidTopic_0_10(address,(ExchangeNode)node,link): + new QpidQueue_0_10(address,(QueueNode)node,link)); + + return dest; + } + catch(Exception e) + { + throw new AddressException("Error creating destination impl",e); + } + } + + private void checkAddressType(Address address,AddressHelper_0_10 helper) throws AddressException + { + ExchangeBoundResult result = ssn.exchangeBound(address.getName(),address.getName(), + null,null).get(); + if (result.getQueueNotFound() && result.getExchangeNotFound()) { + //neither a queue nor an exchange exists with that name; + //treat it as a queue unless a type is specified. + if (helper.getNodeType() == AddressType.UNSPECIFIED) + { + address.setAddressType(AddressType.QUEUE_ADDRESS); + } + else + { + address.setAddressType(AddressType.TOPIC_ADDRESS); + } + } + else if (result.getExchangeNotFound()) + { + //name refers to a queue + address.setAddressType(AddressType.QUEUE_ADDRESS); + } + else if (result.getQueueNotFound()) + { + //name refers to an exchange + address.setAddressType(AddressType.TOPIC_ADDRESS); + } + else + { + //both a queue and exchange exist for that name + if (helper.getNodeType() == AddressType.UNSPECIFIED) + { + throw new AddressException("Ambiguous address, please specify a node type. Ex type:{queue|topic}"); + } + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java new file mode 100644 index 0000000000..46c789a4e7 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +public class Binding +{ + String exchange; + String bindingKey; + String queue; + Map<String,Object> args; + + public Binding(String exchange, + String queue, + String bindingKey, + Map<String,Object> args) + { + this.exchange = exchange; + this.queue = queue; + this.bindingKey = bindingKey; + this.args = args; + } + + public String getExchange() + { + return exchange; + } + + public String getQueue() + { + return queue; + } + + public String getBindingKey() + { + return bindingKey; + } + + public Map<String, Object> getArgs() + { + return args; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java new file mode 100644 index 0000000000..633fd3ed24 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + + +public class ExchangeNode extends Node_0_10 +{ + private String exchangeType; + + public ExchangeNode(AddressHelper_0_10 helper) + { + super(helper); + exchangeType = helper.getNodeExchangeType(); + } + + public String getExchangeType() + { + return exchangeType; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java new file mode 100644 index 0000000000..c8df601c48 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.Link; + +public class Link_0_10 extends Link +{ + private Subscription subscription; + private List<Binding> bindings; + + private Boolean autoDelete = null; + private String altExchange; + private Boolean exclusive = null; + private Map<String,Object> declareArgs; + + @SuppressWarnings("unchecked") + public Link_0_10(AddressHelper_0_10 helper) throws AddressException + { + super(helper); + autoDelete = helper.isLinkQueueAutoDelete(); + exclusive = helper.isLinkQueueExclusive(); + altExchange = helper.getLinkQueueAltExchange(); + declareArgs = helper.getLinkQueueDeclareArgs(); + bindings = helper.getLinkQueueBindings(); + subscription = new Subscription(); + subscription.setExclusive(helper.isSubscriptionExclusive()); + subscription.setArgs(helper.getSubscriptionArguments()); + } + + public List<Binding> getQueueBindings() + { + return bindings; + } + + public Boolean isQueueAutoDelete() + { + return autoDelete; + } + + public String getQueueAltExchange() + { + return altExchange; + } + + public Boolean isQueueExclusive() + { + return exclusive; + } + + public Map<String, Object> getQueueDeclareArgs() + { + return declareArgs; + } + + + public Subscription getSubscription() + { + return this.subscription; + } + + public static class Subscription + { + private Map<String,Object> args = new HashMap<String,Object>(); + private Boolean exclusive = null; + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } + + public Boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(Boolean exclusive) + { + this.exclusive = exclusive; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java new file mode 100644 index 0000000000..f97b8c11b4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +import org.apache.qpid.messaging.address.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Node_0_10 extends Node +{ + private static final Logger _logger = LoggerFactory.getLogger(Node_0_10.class); + + private boolean autoDelete = false; + private String altExchange; + private Map<String,Object> declareArgs; + + public Node_0_10(AddressHelper_0_10 helper) + { + super(helper); + declareArgs = helper.getNodeDeclareArgs(); + autoDelete = helper.isNodeAutoDelete(); + altExchange = helper.getNodeAltExchange(); + } + + public boolean isAutoDelete() + { + return autoDelete; + } + + public String getAltExchange() + { + return altExchange; + } + + public Map<String, Object> getDeclareArgs() + { + return declareArgs; + } + + public boolean matchProps(Map<String,Object> target) + { + boolean match = true; + Map<String,Object> source = declareArgs; + for (String key: source.keySet()) + { + match = target.containsKey(key) && + target.get(key).equals(source.get(key)); + + if (!match) + { + StringBuffer buf = new StringBuffer(); + buf.append("Property given in address did not match with the args sent by the broker."); + buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); + buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }"); + _logger.debug(buf.toString()); + return match; + } + } + + return match; + } +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java new file mode 100644 index 0000000000..4c2f94ae59 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +import org.apache.qpid.AMQException; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SubscriptionSettings; +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.AddressHelper; +import org.apache.qpid.messaging.address.Link; +import org.apache.qpid.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.amqp_0_10.Session_0_10; +import org.apache.qpid.messaging.QpidQueue; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.SessionException; + +public class QpidQueue_0_10 extends QpidQueue +{ + private Address address; + private QueueNode queue; + private Link_0_10 link; + + public QpidQueue_0_10(Address address,QueueNode queue,Link_0_10 link) throws Exception + { + this.address = address; + this.queue = queue; + this.link = link; + queueName = address.getName(); + } + + @Override + public void checkCreate(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getCreatePolicy(), mode)) + { + ssn.queueDeclare(queueName, + queue.getAltExchange(), + queue.getDeclareArgs(), + queue.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + queue.isDurable() ? Option.DURABLE : Option.NONE, + queue.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + if (queue.getBindings().size() > 0) + { + for (Binding binding: queue.getBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "anq.topic" : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + } + else + { + try + { + ssn.queueDeclare(queueName, null, null,Option.PASSIVE); + ssn.sync(); + } + catch(SessionException e) + { + if (e.getException() != null + && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) + { + throw new AddressException("The Queue '" + queueName +"' does not exist",e); + } + else + { + throw e; + } + } + } + } + + @Override + public void checkAssert(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getAssertPolicy(), mode)) + { + boolean match = false; + try + { + QueueQueryResult result = ssn.queueQuery(queueName, Option.NONE).get(); + + match = queueName.equals(result.getQueue()) && + (result.getDurable() == queue.isDurable()) && + (result.getAutoDelete() == queue.isAutoDelete()) && + (result.getExclusive() == queue.isExclusive()) && + (queue.matchProps(result.getArguments())); + } + catch(SessionException e) + { + if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) + { + match = false; + } + else + { + throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), + "Error querying queue",e); + } + } + if (!match) + { + throw new AddressException("The queue described in the address does not exist on the broker"); + } + } + } + + @Override + public void checkDelete(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getDeletePolicy(), mode)) + { + ssn.queueDelete(queueName, Option.NONE); + ssn.sync(); + } + } + + @Override + public void createSubscription(Session session,SubscriptionSettings settings) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "amq.topic" : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings; + Map<String,Object> arguments = settings_0_10.getArgs(); + arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs()); + + if (link.getReliability() == Reliability.UNRELIABLE || + link.getReliability() == Reliability.AT_MOST_ONCE) + { + settings_0_10.setAcceptMode(MessageAcceptMode.NONE); + } + + // for queues subscriptions are non exclusive by default. + boolean exclusive = + (link.getSubscription().isExclusive() == null) ? false : link.getSubscription().isExclusive(); + + ssn.messageSubscribe(queueName, + settings_0_10.getSubscriptionTag(), + settings_0_10.getAcceptMode(), + settings_0_10.getAccquireMode(), + null, // resume id + 0, // resume ttl + arguments, + exclusive ? Option.EXCLUSIVE : Option.NONE); + } + + @Override + public void deleteSubscription(Session session) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "amq.topic" : + binding.getExchange(); + + ssn.exchangeUnbind(queue, + exchange, + binding.getBindingKey(), + Option.NONE); + } + } + + // ideally we should cancel the subscription here + } + + public String getSubscriptionQueue() + { + return queueName; + } + + public long getConsumerCapacity() + { + return link.getConsumerCapacity(); + } + + public long getProducerCapacity() + { + return link.getProducerCapacity(); + } + + public String toString() + { + return address.toString(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java new file mode 100644 index 0000000000..b9195220d4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java @@ -0,0 +1,278 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SubscriptionSettings; +import org.apache.qpid.messaging.Address.PolicyType; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.messaging.QpidTopic; +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.AddressHelper; +import org.apache.qpid.messaging.address.Link; +import org.apache.qpid.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.amqp_0_10.Session_0_10; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.SessionException; + +public class QpidTopic_0_10 extends QpidTopic +{ + private String exchangeName; + private Session ssn; + private Address address; + private ExchangeNode exchange; + private Link_0_10 link; + private String subscriptionQueue; + + public QpidTopic_0_10(Address address,ExchangeNode exchange,Link_0_10 link) throws Exception + { + if (Reliability.AT_LEAST_ONCE == link.getReliability()) + { + throw new Exception("AT-LEAST-ONCE is not yet supported for Topics"); + } + + this.address = address; + this.exchange = exchange; + this.link = link; + this.exchangeName = address.getName(); + topicName = retrieveTopicName(); + } + + @Override + public void checkCreate(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(exchange.getCreatePolicy(), mode)) + { + ssn.exchangeDeclare(exchangeName, + exchange.getExchangeType(), + exchange.getAltExchange(), + exchange.getDeclareArgs(), + exchangeName.toString().startsWith("amq.") ? + Option.PASSIVE : Option.NONE); + } + else + { + try + { + ssn.exchangeDeclare(exchangeName, null, null, null, Option.PASSIVE); + ssn.sync(); + } + catch(SessionException e) + { + if (e.getException() != null + && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) + { + throw new AddressException("The exchange '" + exchangeName +"' does not exist",e); + } + else + { + throw e; + } + } + } + } + + @Override + public void checkAssert(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(exchange.getAssertPolicy(), mode)) + { + ExchangeQueryResult result = ssn.exchangeQuery(exchangeName, Option.NONE).get(); + boolean match = !result.getNotFound() && + (result.getDurable() == exchange.isDurable()) && + (exchange.getExchangeType() != null && exchange.getExchangeType().equals(result.getType())) && + (exchange.matchProps(result.getArguments())); + + if (!match) + { + throw new AddressException("The exchange described by the address does not exist on the broker"); + } + } + } + + @Override + public void checkDelete(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(exchange.getDeletePolicy(), mode) && + !exchangeName.toString().startsWith("amq.")) + { + ssn.exchangeDelete(exchangeName, Option.NONE); + ssn.sync(); + } + } + + @Override + public void createSubscription(Session session,SubscriptionSettings settings) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + subscriptionQueue = getSubscriptionQueueName(); + + // for topics subscriptions queues are exclusive by default. + boolean exclusive = + (link.isQueueExclusive() == null) ? true : link.getSubscription().isExclusive(); + + // for topics subscriptions are autoDelete by default. + boolean autoDelete = + (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive(); + + ssn.queueDeclare(subscriptionQueue, + link.getQueueAltExchange(), + link.getQueueDeclareArgs(), + autoDelete ? Option.AUTO_DELETE : Option.NONE, + link.isDurable() ? Option.DURABLE : Option.NONE, + exclusive ? Option.EXCLUSIVE : Option.NONE); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + subscriptionQueue: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + address.getName() : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + else + { + String subject = address.getSubject(); + ssn.exchangeBind(subscriptionQueue, + address.getName(), + subject == null || subject.trim().equals("") ? "#" : subject, + null); + } + + SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings; + Map<String,Object> arguments = settings_0_10.getArgs(); + arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs()); + + if (link.getReliability() == Reliability.UNRELIABLE || + link.getReliability() == Reliability.AT_MOST_ONCE) + { + settings_0_10.setAcceptMode(MessageAcceptMode.NONE); + } + + // for topics subscriptions are exclusive by default. + boolean exclusiveConsume = + (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive(); + + ssn.messageSubscribe(subscriptionQueue, + settings_0_10.getSubscriptionTag(), + settings_0_10.getAcceptMode(), + settings_0_10.getAccquireMode(), + null, // resume id + 0, // resume ttl + arguments, + exclusiveConsume ? Option.EXCLUSIVE : Option.NONE); + } + + @Override + public void deleteSubscription(Session session) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + subscriptionQueue: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "anq.topic" : + binding.getExchange(); + + ssn.exchangeUnbind(queue, + exchange, + binding.getBindingKey(), + Option.NONE); + } + } + ssn.queueDelete(subscriptionQueue, Option.NONE); + } + + private String getSubscriptionQueueName() + { + if (link.getName() == null) + { + return "TempQueue" + UUID.randomUUID(); + } + else + { + return link.getName(); + } + } + + private String retrieveTopicName() + { + if (address.getSubject() != null && !address.getSubject().trim().equals("")) + { + return address.getSubject(); + } + else if ("topic".equals(exchange.getExchangeType())) + { + return "#"; + } + else + { + return ""; + } + } + + public String getSubscriptionQueue() + { + return subscriptionQueue; + } + + public long getConsumerCapacity() + { + return link.getConsumerCapacity(); + } + + public long getProducerCapacity() + { + return link.getProducerCapacity(); + } + + public String toString() + { + return address.toString(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java new file mode 100644 index 0000000000..04f0911f44 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.List; + +import org.apache.qpid.messaging.Address; + +public class QueueNode extends Node_0_10 +{ + private boolean exclusive = false; + private List<Binding> bindings; + + public QueueNode(AddressHelper_0_10 helper) + { + super(helper); + exclusive = helper.isNodeExclusive(); + bindings = helper.getNodeBindings(); + } + + public boolean isExclusive() + { + return exclusive; + } + + public List<Binding> getBindings() + { + return bindings; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java new file mode 100644 index 0000000000..cbb51c637b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +import org.apache.qpid.messaging.SubscriptionSettings; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; + +public class SubscriptionSettings_0_10 implements SubscriptionSettings +{ + String messageSelector; + String subscriptionTag; + MessageAcceptMode acceptMode; + MessageAcquireMode accquireMode; + Map<String,Object> args; + + public String getMessageSelector() + { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) + { + this.messageSelector = messageSelector; + } + + public String getSubscriptionTag() + { + return subscriptionTag; + } + + public void setSubscriptionTag(String subscriptionTag) + { + this.subscriptionTag = subscriptionTag; + } + + public MessageAcceptMode getAcceptMode() + { + return acceptMode; + } + + public void setAcceptMode(MessageAcceptMode acceptMode) + { + this.acceptMode = acceptMode; + } + + public MessageAcquireMode getAccquireMode() + { + return accquireMode; + } + + public void setAccquireMode(MessageAcquireMode accquireMode) + { + this.accquireMode = accquireMode; + } + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } +} |