summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java439
1 files changed, 49 insertions, 390 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 3812e612aa..40c9113a2d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -23,11 +23,8 @@ import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -37,8 +34,6 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -47,18 +42,15 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.messaging.address.AddressResolver;
+import org.apache.qpid.messaging.address.amqp_0_10.SubscriptionSettings_0_10;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -143,7 +135,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* USed to store the range of in tx messages
*/
private RangeSet _txRangeSet = new RangeSet();
- private int _txSize = 0;
+ private int _txSize = 0;
+ private AddressResolver addressResolver;
//--- constructors
/**
@@ -345,31 +338,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- List<Binding> bindings = new ArrayList<Binding>();
- bindings.addAll(destination.getSourceNode().getBindings());
- bindings.addAll(destination.getTargetNode().getBindings());
-
- String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
- destination.getAddressName(): "amq.topic";
-
- for (Binding binding: bindings)
- {
- String queue = binding.getQueue() == null?
- queueName.asString(): binding.getQueue();
-
- String exchange = binding.getExchange() == null ?
- defaultExchange :
- binding.getExchange();
-
- _logger.debug("Binding queue : " + queue +
- " exchange: " + exchange +
- " using binding key " + binding.getBindingKey() +
- " with args " + printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
- }
+ // do nothing atm
+ // when creating a producer or consumer the create/assert method should be invokved
+ // for consumers create and delete subscriptions should be called in the constructor and close()
+ // when closing them the delete method should be invoked
}
if (!nowait)
@@ -573,45 +545,51 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
boolean preAcquire;
- long capacity = getCapacity(consumer.getDestination());
+ long capacity;
+ try
+ {
+ capacity = consumer.getDestination().getConsumerCapacity(this);
+ }
+ catch (Exception e1)
+ {
+ AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR, "Error retrieving capacity",e1);
+ throw ex;
+ }
try
{
- boolean isTopic;
+ boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
- isTopic = consumer.getDestination() instanceof AMQTopic ||
+ boolean isTopic = consumer.getDestination() instanceof AMQTopic ||
consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ;
preAcquire = isTopic || (!consumer.isNoConsume() &&
(consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")));
+
+ getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
+ acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED,
+ null, 0, arguments,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
else
{
- isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE;
-
+ AddressBasedDestination dest = (AddressBasedDestination)consumer.getDestination();
preAcquire = !consumer.isNoConsume() &&
- (isTopic || consumer.getMessageSelector() == null ||
+ (dest.isTopic() || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
- arguments.putAll(
- (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
+ SubscriptionSettings_0_10 settings = new SubscriptionSettings_0_10();
+ settings.setAcceptMode(acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT);
+ settings.setAccquireMode(preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED);
+ settings.setArgs(arguments);
+ settings.setMessageSelector(messageSelector);
+ settings.setSubscriptionTag(String.valueOf(tag));
+ dest.createSubscription(this,settings);
}
-
- boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-
- if (consumer.getDestination().getLink() != null)
- {
- acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
- }
-
- getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
- acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
- preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
{
@@ -646,21 +624,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private long getCapacity(AMQDestination destination)
- {
- long capacity = 0;
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (prefetch())
- {
- capacity = getAMQConnection().getMaxPrefetch();
- }
- return capacity;
- }
-
/**
* Create an 0_10 message producer
*/
@@ -775,12 +738,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- QueueNode node = (QueueNode)amqd.getSourceNode();
- getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
- node.getDeclareArgs(),
- node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
- node.isDurable() ? Option.DURABLE : Option.NONE,
- node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ // do nothing
}
// passive --> false
@@ -825,7 +783,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
//only set if msg list is null
try
{
- long capacity = getCapacity(consumer.getDestination());
+ long capacity = consumer.getDestination().getConsumerCapacity(this);
if (capacity == 0)
{
@@ -1056,317 +1014,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return AMQMessageDelegateFactory.FACTORY_0_10;
}
-
- public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
- {
- boolean match = true;
- ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
- match = !result.getNotFound();
-
- if (match)
- {
- if (assertNode)
- {
- match = (result.getDurable() == node.isDurable()) &&
- (node.getExchangeType() != null &&
- node.getExchangeType().equals(result.getType())) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
- }
- else if (node.getExchangeType() != null)
- {
- // even if assert is false, better to verify this
- match = node.getExchangeType().equals(result.getType());
- if (!match)
- {
- _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() +
- " actual " + result.getType());
- }
- }
- else
- {
- _logger.debug("Setting Exchange type " + result.getType());
- node.setExchangeType(result.getType());
- dest.setExchangeClass(new AMQShortString(result.getType()));
- }
- }
-
- return match;
- }
-
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
- {
- boolean match = true;
- try
- {
- QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
-
- if (match && assertNode)
- {
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
- }
- else if (match)
- {
- // should I use the queried details to update the local data structure.
- }
- }
- catch(SessionException e)
- {
- if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
- {
- match = false;
- }
- else
- {
- throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
- "Error querying queue",e);
- }
- }
-
- return match;
- }
-
- private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
- {
- boolean match = true;
- for (String key: source.keySet())
- {
- match = target.containsKey(key) &&
- target.get(key).equals(source.get(key));
-
- if (!match)
- {
- StringBuffer buf = new StringBuffer();
- buf.append("Property given in address did not match with the args sent by the broker.");
- buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
- buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
- _logger.debug(buf.toString());
- return match;
- }
- }
-
- return match;
- }
-
- /**
- * 1. Try to resolve the address type (queue or exchange)
- * 2. if type == queue,
- * 2.1 verify queue exists or create if create == true
- * 2.2 If not throw exception
- *
- * 3. if type == exchange,
- * 3.1 verify exchange exists or create if create == true
- * 3.2 if not throw exception
- * 3.3 if exchange exists (or created) create subscription queue.
- */
-
- @SuppressWarnings("deprecation")
- public void handleAddressBasedDestination(AMQDestination dest,
- boolean isConsumer,
- boolean noWait) throws AMQException
- {
- if (dest.isAddressResolved())
- {
- if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
- {
- createSubscriptionQueue(dest);
- }
- }
- else
- {
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
- boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
- int type = resolveAddressType(dest);
-
- if (type == AMQDestination.QUEUE_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.UNRELIABLE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
- {
- throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");
- }
-
- switch (type)
- {
- case AMQDestination.QUEUE_TYPE:
- {
- if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
- {
- setLegacyFiledsForQueueType(dest);
- break;
- }
- else if(createNode)
- {
- setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,false,noWait);
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
- break;
- }
- }
-
- case AMQDestination.TOPIC_TYPE:
- {
- if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest);
- }
- break;
- }
- else if(createNode)
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- sendExchangeDeclare(dest.getAddressName(),
- dest.getExchangeClass().asString(),
- dest.getTargetNode().getAlternateExchange(),
- dest.getTargetNode().getDeclareArgs(),
- false);
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest);
- }
- break;
- }
- }
-
- default:
- throw new AMQException(
- "The name '" + dest.getAddressName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
- }
- dest.setAddressResolved(true);
- }
- }
-
- public int resolveAddressType(AMQDestination dest) throws AMQException
- {
- int type = dest.getAddressType();
- String name = dest.getAddressName();
- if (type != AMQDestination.UNKNOWN_TYPE)
- {
- return type;
- }
- else
- {
- ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get();
- if (result.getQueueNotFound() && result.getExchangeNotFound()) {
- //neither a queue nor an exchange exists with that name; treat it as a queue
- type = AMQDestination.QUEUE_TYPE;
- } else if (result.getExchangeNotFound()) {
- //name refers to a queue
- type = AMQDestination.QUEUE_TYPE;
- } else if (result.getQueueNotFound()) {
- //name refers to an exchange
- type = AMQDestination.TOPIC_TYPE;
- } else {
- //both a queue and exchange exist for that name
- throw new AMQException("Ambiguous address, please specify queue or topic as node type");
- }
- dest.setAddressType(type);
- dest.rebuildTargetAndSourceNodes(type);
- return type;
- }
- }
-
- private void verifySubject(AMQDestination dest) throws AMQException
- {
- if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
- {
-
- if ("topic".equals(dest.getExchangeClass().toString()))
- {
- dest.setRoutingKey(new AMQShortString("#"));
- dest.setSubject(dest.getRoutingKey().toString());
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(""));
- dest.setSubject("");
- }
- }
- }
-
- private void createSubscriptionQueue(AMQDestination dest) throws AMQException
- {
- QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
-
- if (dest.getQueueName() == null)
- {
- if (dest.getLink() != null && dest.getLink().getName() != null)
- {
- dest.setQueueName(new AMQShortString(dest.getLink().getName()));
- }
- }
- node.setExclusive(true);
- node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,false,true);
- node.addBinding(new Binding(dest.getAddressName(),
- dest.getQueueName(),// should have one by now
- dest.getSubject(),
- Collections.<String,Object>emptyMap()));
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
- }
-
- public void setLegacyFiledsForQueueType(AMQDestination dest)
- {
- // legacy support
- dest.setQueueName(new AMQShortString(dest.getAddressName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
- }
-
- public void setLegacyFiledsForTopicType(AMQDestination dest)
- {
- // legacy support
- dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- ExchangeNode node = (ExchangeNode)dest.getTargetNode();
- dest.setExchangeClass(node.getExchangeType() == null?
- ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
- new AMQShortString(node.getExchangeType()));
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- }
-
- /** This should be moved to a suitable utility class */
- private String printMap(Map<String,Object> map)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("<");
- if (map != null)
- {
- for(String key : map.keySet())
- {
- sb.append(key).append(" = ").append(map.get(key)).append(" ");
- }
- }
- sb.append(">");
- return sb.toString();
- }
protected void acknowledgeImpl()
{
@@ -1389,4 +1036,16 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_highestDeliveryTag.set(-1);
super.resubscribe();
}
+
+ public boolean isQueueExist(String queue)
+ {
+ QueueQueryResult result = _qpidSession.queueQuery(queue, Option.NONE).get();
+ return queue.equals(result.getQueue());
+ }
+
+ public boolean isExchangeExist(String exchange)
+ {
+ ExchangeQueryResult result = _qpidSession.exchangeQuery(exchange, Option.NONE).get();
+ return !result.getNotFound();
+ }
}