diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java new file mode 100644 index 0000000000..4c2f94ae59 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.messaging.address.amqp_0_10; + +import java.util.Map; + +import org.apache.qpid.AMQException; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SubscriptionSettings; +import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.AddressHelper; +import org.apache.qpid.messaging.address.Link; +import org.apache.qpid.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.amqp_0_10.Session_0_10; +import org.apache.qpid.messaging.QpidQueue; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.SessionException; + +public class QpidQueue_0_10 extends QpidQueue +{ + private Address address; + private QueueNode queue; + private Link_0_10 link; + + public QpidQueue_0_10(Address address,QueueNode queue,Link_0_10 link) throws Exception + { + this.address = address; + this.queue = queue; + this.link = link; + queueName = address.getName(); + } + + @Override + public void checkCreate(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getCreatePolicy(), mode)) + { + ssn.queueDeclare(queueName, + queue.getAltExchange(), + queue.getDeclareArgs(), + queue.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + queue.isDurable() ? Option.DURABLE : Option.NONE, + queue.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + if (queue.getBindings().size() > 0) + { + for (Binding binding: queue.getBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "anq.topic" : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + } + else + { + try + { + ssn.queueDeclare(queueName, null, null,Option.PASSIVE); + ssn.sync(); + } + catch(SessionException e) + { + if (e.getException() != null + && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) + { + throw new AddressException("The Queue '" + queueName +"' does not exist",e); + } + else + { + throw e; + } + } + } + } + + @Override + public void checkAssert(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getAssertPolicy(), mode)) + { + boolean match = false; + try + { + QueueQueryResult result = ssn.queueQuery(queueName, Option.NONE).get(); + + match = queueName.equals(result.getQueue()) && + (result.getDurable() == queue.isDurable()) && + (result.getAutoDelete() == queue.isAutoDelete()) && + (result.getExclusive() == queue.isExclusive()) && + (queue.matchProps(result.getArguments())); + } + catch(SessionException e) + { + if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) + { + match = false; + } + else + { + throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), + "Error querying queue",e); + } + } + if (!match) + { + throw new AddressException("The queue described in the address does not exist on the broker"); + } + } + } + + @Override + public void checkDelete(Session session,CheckMode mode) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (AddressHelper.isAllowed(queue.getDeletePolicy(), mode)) + { + ssn.queueDelete(queueName, Option.NONE); + ssn.sync(); + } + } + + @Override + public void createSubscription(Session session,SubscriptionSettings settings) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "amq.topic" : + binding.getExchange(); + + ssn.exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings; + Map<String,Object> arguments = settings_0_10.getArgs(); + arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs()); + + if (link.getReliability() == Reliability.UNRELIABLE || + link.getReliability() == Reliability.AT_MOST_ONCE) + { + settings_0_10.setAcceptMode(MessageAcceptMode.NONE); + } + + // for queues subscriptions are non exclusive by default. + boolean exclusive = + (link.getSubscription().isExclusive() == null) ? false : link.getSubscription().isExclusive(); + + ssn.messageSubscribe(queueName, + settings_0_10.getSubscriptionTag(), + settings_0_10.getAcceptMode(), + settings_0_10.getAccquireMode(), + null, // resume id + 0, // resume ttl + arguments, + exclusive ? Option.EXCLUSIVE : Option.NONE); + } + + @Override + public void deleteSubscription(Session session) throws Exception + { + org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession(); + + if (link.getQueueBindings().size() > 0) + { + for (Binding binding: link.getQueueBindings()) + { + String queue = binding.getQueue() == null? + queueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + "amq.topic" : + binding.getExchange(); + + ssn.exchangeUnbind(queue, + exchange, + binding.getBindingKey(), + Option.NONE); + } + } + + // ideally we should cancel the subscription here + } + + public String getSubscriptionQueue() + { + return queueName; + } + + public long getConsumerCapacity() + { + return link.getConsumerCapacity(); + } + + public long getProducerCapacity() + { + return link.getProducerCapacity(); + } + + public String toString() + { + return address.toString(); + } +} |