summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java199
1 files changed, 24 insertions, 175 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 19720ea386..68b7cf1f88 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
@@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
@@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
+ doBind(destination, binding, queue, exchange);
}
}
@@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean nowait, int tag)
throws AMQException, FailoverException
{
- if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
- {
- if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
- {
- String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
-
- createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
- queueName = consumer.getDestination().getAMQQueueName();
- consumer.setQueuename(queueName);
- }
- handleLinkCreation(consumer.getDestination());
- }
+ queueName = preprocessAddressTopic(consumer, queueName);
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
@@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
}
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException
+ {
+ sendExchangeDeclare(name.asString(), type.asString(), null,
+ arguments == null ? null : FieldTableSupport.convertToMap(arguments),
+ nowait, durable, autoDelete);
+ }
+
+
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
@@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+ @Override
public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
@@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
+ @Override
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
Node node = dest.getNode();
@@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
- /**
- * 1. Try to resolve the address type (queue or exchange)
- * 2. if type == queue,
- * 2.1 verify queue exists or create if create == true
- * 2.2 If not throw exception
- *
- * 3. if type == exchange,
- * 3.1 verify exchange exists or create if create == true
- * 3.2 if not throw exception
- * 3.3 if exchange exists (or created) create subscription queue.
- */
-
- @SuppressWarnings("deprecation")
- public void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException
- {
- if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
- {
- return;
- }
- else
- {
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
- boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
- int type = resolveAddressType(dest);
-
- switch (type)
- {
- case AMQDestination.QUEUE_TYPE:
- {
- if(createNode)
- {
- setLegacyFieldsForQueueType(dest);
- handleQueueNodeCreation(dest,noLocal);
- break;
- }
- else if (isQueueExist(dest,assertNode))
- {
- setLegacyFieldsForQueueType(dest);
- break;
- }
- }
-
- case AMQDestination.TOPIC_TYPE:
- {
- if(createNode)
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- handleExchangeNodeCreation(dest);
- break;
- }
- else if (isExchangeExist(dest,assertNode))
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- break;
- }
- }
-
- default:
- throw new AMQException(
- "The name '" + dest.getAddressName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
- }
- dest.setAddressResolved(System.currentTimeMillis());
- }
- }
-
+ @Override
public int resolveAddressType(AMQDestination dest) throws AMQException
{
int type = dest.getAddressType();
@@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void verifySubject(AMQDestination dest) throws AMQException
- {
- if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
- {
-
- if ("topic".equals(dest.getExchangeClass().toString()))
- {
- dest.setRoutingKey(new AMQShortString("#"));
- dest.setSubject(dest.getRoutingKey().toString());
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(""));
- dest.setSubject("");
- }
- }
- }
-
+ @Override
void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
{
Link link = dest.getLink();
@@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
bindingArguments);
}
- public void setLegacyFieldsForQueueType(AMQDestination dest)
- {
- // legacy support
- dest.setQueueName(new AMQShortString(dest.getAddressName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
- }
-
- public void setLegacyFiledsForTopicType(AMQDestination dest)
- {
- // legacy support
- dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- Node node = dest.getNode();
- dest.setExchangeClass(node.getExchangeType() == null?
- AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS):
- new AMQShortString(node.getExchangeType()));
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- }
-
protected void acknowledgeImpl()
{
RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
@@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+ @Override
+ protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
{
Node node = dest.getNode();
Map<String,Object> arguments = node.getDeclareArgs();
@@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
+ @Override
void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
{
Node node = dest.getNode();
@@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
- void handleLinkCreation(AMQDestination dest) throws AMQException
- {
- createBindings(dest, dest.getLink().getBindings());
- }
-
- void createBindings(AMQDestination dest, List<Binding> bindings)
+ protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange)
{
- String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
- .getAddressName() : "amq.topic";
-
- String defaultQueueName = null;
- if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
- {
- defaultQueueName = dest.getQueueName();
- }
- else
- {
- defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
- }
-
- for (Binding binding: bindings)
- {
- String queue = binding.getQueue() == null?
- defaultQueueName: binding.getQueue();
-
- String exchange = binding.getExchange() == null ?
- defaultExchangeForBinding :
- binding.getExchange();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Binding queue : " + queue +
- " exchange: " + exchange +
- " using binding key " + binding.getBindingKey() +
- " with args " + Strings.printMap(binding.getArgs()));
- }
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
- }
+ getQpidSession().exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
}
void handleLinkDelete(AMQDestination dest) throws AMQException
@@ -1614,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ @Override
void handleNodeDelete(AMQDestination dest) throws AMQException
{
if (AMQDestination.TOPIC_TYPE == dest.getAddressType())