summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java278
1 files changed, 278 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java
new file mode 100644
index 0000000000..b9195220d4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SubscriptionSettings;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.messaging.QpidTopic;
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.AddressHelper;
+import org.apache.qpid.messaging.address.Link;
+import org.apache.qpid.messaging.address.Link.Reliability;
+import org.apache.qpid.messaging.amqp_0_10.Session_0_10;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.SessionException;
+
+public class QpidTopic_0_10 extends QpidTopic
+{
+ private String exchangeName;
+ private Session ssn;
+ private Address address;
+ private ExchangeNode exchange;
+ private Link_0_10 link;
+ private String subscriptionQueue;
+
+ public QpidTopic_0_10(Address address,ExchangeNode exchange,Link_0_10 link) throws Exception
+ {
+ if (Reliability.AT_LEAST_ONCE == link.getReliability())
+ {
+ throw new Exception("AT-LEAST-ONCE is not yet supported for Topics");
+ }
+
+ this.address = address;
+ this.exchange = exchange;
+ this.link = link;
+ this.exchangeName = address.getName();
+ topicName = retrieveTopicName();
+ }
+
+ @Override
+ public void checkCreate(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(exchange.getCreatePolicy(), mode))
+ {
+ ssn.exchangeDeclare(exchangeName,
+ exchange.getExchangeType(),
+ exchange.getAltExchange(),
+ exchange.getDeclareArgs(),
+ exchangeName.toString().startsWith("amq.") ?
+ Option.PASSIVE : Option.NONE);
+ }
+ else
+ {
+ try
+ {
+ ssn.exchangeDeclare(exchangeName, null, null, null, Option.PASSIVE);
+ ssn.sync();
+ }
+ catch(SessionException e)
+ {
+ if (e.getException() != null
+ && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
+ {
+ throw new AddressException("The exchange '" + exchangeName +"' does not exist",e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void checkAssert(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(exchange.getAssertPolicy(), mode))
+ {
+ ExchangeQueryResult result = ssn.exchangeQuery(exchangeName, Option.NONE).get();
+ boolean match = !result.getNotFound() &&
+ (result.getDurable() == exchange.isDurable()) &&
+ (exchange.getExchangeType() != null && exchange.getExchangeType().equals(result.getType())) &&
+ (exchange.matchProps(result.getArguments()));
+
+ if (!match)
+ {
+ throw new AddressException("The exchange described by the address does not exist on the broker");
+ }
+ }
+ }
+
+ @Override
+ public void checkDelete(Session session,CheckMode mode) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (AddressHelper.isAllowed(exchange.getDeletePolicy(), mode) &&
+ !exchangeName.toString().startsWith("amq."))
+ {
+ ssn.exchangeDelete(exchangeName, Option.NONE);
+ ssn.sync();
+ }
+ }
+
+ @Override
+ public void createSubscription(Session session,SubscriptionSettings settings) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+ subscriptionQueue = getSubscriptionQueueName();
+
+ // for topics subscriptions queues are exclusive by default.
+ boolean exclusive =
+ (link.isQueueExclusive() == null) ? true : link.getSubscription().isExclusive();
+
+ // for topics subscriptions are autoDelete by default.
+ boolean autoDelete =
+ (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
+
+ ssn.queueDeclare(subscriptionQueue,
+ link.getQueueAltExchange(),
+ link.getQueueDeclareArgs(),
+ autoDelete ? Option.AUTO_DELETE : Option.NONE,
+ link.isDurable() ? Option.DURABLE : Option.NONE,
+ exclusive ? Option.EXCLUSIVE : Option.NONE);
+
+ if (link.getQueueBindings().size() > 0)
+ {
+ for (Binding binding: link.getQueueBindings())
+ {
+ String queue = binding.getQueue() == null?
+ subscriptionQueue: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ address.getName() :
+ binding.getExchange();
+
+ ssn.exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
+ }
+ else
+ {
+ String subject = address.getSubject();
+ ssn.exchangeBind(subscriptionQueue,
+ address.getName(),
+ subject == null || subject.trim().equals("") ? "#" : subject,
+ null);
+ }
+
+ SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings;
+ Map<String,Object> arguments = settings_0_10.getArgs();
+ arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs());
+
+ if (link.getReliability() == Reliability.UNRELIABLE ||
+ link.getReliability() == Reliability.AT_MOST_ONCE)
+ {
+ settings_0_10.setAcceptMode(MessageAcceptMode.NONE);
+ }
+
+ // for topics subscriptions are exclusive by default.
+ boolean exclusiveConsume =
+ (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
+
+ ssn.messageSubscribe(subscriptionQueue,
+ settings_0_10.getSubscriptionTag(),
+ settings_0_10.getAcceptMode(),
+ settings_0_10.getAccquireMode(),
+ null, // resume id
+ 0, // resume ttl
+ arguments,
+ exclusiveConsume ? Option.EXCLUSIVE : Option.NONE);
+ }
+
+ @Override
+ public void deleteSubscription(Session session) throws Exception
+ {
+ org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+
+ if (link.getQueueBindings().size() > 0)
+ {
+ for (Binding binding: link.getQueueBindings())
+ {
+ String queue = binding.getQueue() == null?
+ subscriptionQueue: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ "anq.topic" :
+ binding.getExchange();
+
+ ssn.exchangeUnbind(queue,
+ exchange,
+ binding.getBindingKey(),
+ Option.NONE);
+ }
+ }
+ ssn.queueDelete(subscriptionQueue, Option.NONE);
+ }
+
+ private String getSubscriptionQueueName()
+ {
+ if (link.getName() == null)
+ {
+ return "TempQueue" + UUID.randomUUID();
+ }
+ else
+ {
+ return link.getName();
+ }
+ }
+
+ private String retrieveTopicName()
+ {
+ if (address.getSubject() != null && !address.getSubject().trim().equals(""))
+ {
+ return address.getSubject();
+ }
+ else if ("topic".equals(exchange.getExchangeType()))
+ {
+ return "#";
+ }
+ else
+ {
+ return "";
+ }
+ }
+
+ public String getSubscriptionQueue()
+ {
+ return subscriptionQueue;
+ }
+
+ public long getConsumerCapacity()
+ {
+ return link.getConsumerCapacity();
+ }
+
+ public long getProducerCapacity()
+ {
+ return link.getProducerCapacity();
+ }
+
+ public String toString()
+ {
+ return address.toString();
+ }
+}