summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java227
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java199
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java604
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java51
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java41
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java41
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java83
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java2
14 files changed, 1044 insertions, 351 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index b64d355f80..2a91ff3ce2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
+ private static final long DEFAULT_CLOSE_TIMEOUT = 2000l;
+
private final long _connectionNumber;
/**
@@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
- private static final long DEFAULT_TIMEOUT = 1000 * 30;
private AMQConnectionDelegate _delegate;
@@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close() throws JMSException
{
- close(DEFAULT_TIMEOUT);
+ close(DEFAULT_CLOSE_TIMEOUT);
}
public void close(long timeout) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 5242629a91..9650ad76fb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -43,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -67,6 +68,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
private boolean _messageCompressionSupported;
+ private boolean _addrSyntaxSupported;
public void closeConnection(long timeout) throws JMSException, AMQException
{
@@ -76,6 +78,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
public AMQConnectionDelegate_8_0(AMQConnection conn)
{
_conn = conn;
+ _addrSyntaxSupported =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8,
+ String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT)));
}
protected boolean checkException(Throwable thrown)
@@ -429,4 +434,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return _messageCompressionSupported;
}
+
+ public boolean isAddrSyntaxSupported()
+ {
+ return _addrSyntaxSupported;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index e06fc0f1de..2714caf2a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -20,6 +20,20 @@
*/
package org.apache.qpid.client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Destination;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,20 +48,6 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
-import javax.jms.Destination;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
public abstract class AMQDestination implements Destination, Referenceable, Externalizable
{
@@ -813,7 +813,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
_address = addr;
}
- public int getAddressType(){
+ public int getAddressType()
+ {
return _addressType;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index c2659194e2..0183c30276 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -68,9 +68,11 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ListMessage;
@@ -79,6 +81,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
/*
* TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For
@@ -310,6 +313,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _immediatePrefetch;
}
+ abstract void handleNodeDelete(final AMQDestination dest) throws AMQException;
+
+ abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -600,6 +607,128 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ public void setLegacyFieldsForQueueType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setQueueName(new AMQShortString(dest.getAddressName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+
+ public void setLegacyFieldsForTopicType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setExchangeName(new AMQShortString(dest.getAddressName()));
+ Node node = dest.getNode();
+ dest.setExchangeClass(node.getExchangeType() == null?
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS):
+ new AMQShortString(node.getExchangeType()));
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+ }
+
+ protected void verifySubject(AMQDestination dest) throws AMQException
+ {
+ if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
+ {
+
+ if ("topic".equals(dest.getExchangeClass().toString()))
+ {
+ dest.setRoutingKey(new AMQShortString("#"));
+ dest.setSubject(dest.getRoutingKey().toString());
+ }
+ else
+ {
+ dest.setRoutingKey(new AMQShortString(""));
+ dest.setSubject("");
+ }
+ }
+ }
+
+ public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws AMQException;
+
+ public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException;
+
+ /**
+ * 1. Try to resolve the address type (queue or exchange)
+ * 2. if type == queue,
+ * 2.1 verify queue exists or create if create == true
+ * 2.2 If not throw exception
+ *
+ * 3. if type == exchange,
+ * 3.1 verify exchange exists or create if create == true
+ * 3.2 if not throw exception
+ * 3.3 if exchange exists (or created) create subscription queue.
+ */
+
+ @SuppressWarnings("deprecation")
+ public void resolveAddress(AMQDestination dest,
+ boolean isConsumer,
+ boolean noLocal) throws AMQException
+ {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
+ {
+ return;
+ }
+ else
+ {
+ boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) ||
+ (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER);
+
+ boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) ||
+ (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER);
+
+
+
+ int type = resolveAddressType(dest);
+
+ switch (type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ {
+ if(createNode)
+ {
+ setLegacyFieldsForQueueType(dest);
+ handleQueueNodeCreation(dest,noLocal);
+ break;
+ }
+ else if (isQueueExist(dest,assertNode))
+ {
+ setLegacyFieldsForQueueType(dest);
+ break;
+ }
+ }
+
+ case AMQDestination.TOPIC_TYPE:
+ {
+ if(createNode)
+ {
+ setLegacyFieldsForTopicType(dest);
+ verifySubject(dest);
+ handleExchangeNodeCreation(dest);
+ break;
+ }
+ else if (isExchangeExist(dest,assertNode))
+ {
+ setLegacyFieldsForTopicType(dest);
+ verifySubject(dest);
+ break;
+ }
+ }
+
+ default:
+ throw new AMQException(
+ "The name '" + dest.getAddressName() +
+ "' supplied in the address doesn't resolve to an exchange or a queue");
+ }
+ dest.setAddressResolved(System.currentTimeMillis());
+ }
+ }
+
+ public abstract int resolveAddressType(AMQDestination dest) throws AMQException;
+
protected abstract void acknowledgeImpl() throws JMSException;
/**
@@ -2594,6 +2723,54 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ void handleLinkCreation(AMQDestination dest) throws AMQException
+ {
+ createBindings(dest, dest.getLink().getBindings());
+ }
+
+
+ void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws AMQException
+ {
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (AMQDestination.Binding binding: bindings)
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ doBind(dest, binding, queue, exchange);
+ }
+ }
+
+ protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException;
+
+ abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException;
+
+ abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange)
+ throws AMQException;
+
public abstract void sendConsume(C consumer, AMQShortString queueName,
boolean nowait, int tag) throws AMQException, FailoverException;
@@ -2696,7 +2873,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @throws AMQException If the exchange cannot be declared for any reason.
* TODO Be aware of possible changes to parameter order as versions change.
*/
- private void declareExchange(final AMQShortString name, final AMQShortString type,
+ void declareExchange(final AMQShortString name, final AMQShortString type,
final boolean nowait, final boolean durable,
final boolean autoDelete, final boolean internal) throws AMQException
{
@@ -2710,9 +2887,53 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}, _connection).execute();
}
+ void declareExchange(final AMQShortString name, final AMQShortString type,
+ final boolean nowait, final boolean durable,
+ final boolean autoDelete, final FieldTable arguments,
+ final boolean passive) throws AMQException
+ {
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive);
+ return null;
+ }
+ }, _connection).execute();
+ }
+
+ protected AMQShortString preprocessAddressTopic(final C consumer,
+ AMQShortString queueName) throws AMQException
+ {
+ if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
+ {
+ if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
+ {
+ String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
+
+ createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
+ queueName = consumer.getDestination().getAMQQueueName();
+ consumer.setQueuename(queueName);
+ }
+ handleLinkCreation(consumer.getDestination());
+ }
+ return queueName;
+ }
+
+ abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException;
+
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException;
+
+ public abstract void sendExchangeDeclare(final AMQShortString name,
+ final AMQShortString type,
+ final boolean nowait,
+ boolean durable,
+ boolean autoDelete,
+ FieldTable arguments,
+ final boolean passive) throws AMQException, FailoverException;
+
/**
* Declares a queue for a JMS destination.
* <p>
@@ -2930,10 +3151,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
throws AMQException;
- public abstract void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException;
-
private void registerProducer(long producerId, MessageProducer producer)
{
_producers.put(producerId, producer);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 19720ea386..68b7cf1f88 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
@@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
@@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
+ doBind(destination, binding, queue, exchange);
}
}
@@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean nowait, int tag)
throws AMQException, FailoverException
{
- if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
- {
- if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
- {
- String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
-
- createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
- queueName = consumer.getDestination().getAMQQueueName();
- consumer.setQueuename(queueName);
- }
- handleLinkCreation(consumer.getDestination());
- }
+ queueName = preprocessAddressTopic(consumer, queueName);
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
@@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
}
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException
+ {
+ sendExchangeDeclare(name.asString(), type.asString(), null,
+ arguments == null ? null : FieldTableSupport.convertToMap(arguments),
+ nowait, durable, autoDelete);
+ }
+
+
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
@@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+ @Override
public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
@@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
+ @Override
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
Node node = dest.getNode();
@@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
- /**
- * 1. Try to resolve the address type (queue or exchange)
- * 2. if type == queue,
- * 2.1 verify queue exists or create if create == true
- * 2.2 If not throw exception
- *
- * 3. if type == exchange,
- * 3.1 verify exchange exists or create if create == true
- * 3.2 if not throw exception
- * 3.3 if exchange exists (or created) create subscription queue.
- */
-
- @SuppressWarnings("deprecation")
- public void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException
- {
- if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
- {
- return;
- }
- else
- {
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
- boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
- int type = resolveAddressType(dest);
-
- switch (type)
- {
- case AMQDestination.QUEUE_TYPE:
- {
- if(createNode)
- {
- setLegacyFieldsForQueueType(dest);
- handleQueueNodeCreation(dest,noLocal);
- break;
- }
- else if (isQueueExist(dest,assertNode))
- {
- setLegacyFieldsForQueueType(dest);
- break;
- }
- }
-
- case AMQDestination.TOPIC_TYPE:
- {
- if(createNode)
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- handleExchangeNodeCreation(dest);
- break;
- }
- else if (isExchangeExist(dest,assertNode))
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- break;
- }
- }
-
- default:
- throw new AMQException(
- "The name '" + dest.getAddressName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
- }
- dest.setAddressResolved(System.currentTimeMillis());
- }
- }
-
+ @Override
public int resolveAddressType(AMQDestination dest) throws AMQException
{
int type = dest.getAddressType();
@@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void verifySubject(AMQDestination dest) throws AMQException
- {
- if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
- {
-
- if ("topic".equals(dest.getExchangeClass().toString()))
- {
- dest.setRoutingKey(new AMQShortString("#"));
- dest.setSubject(dest.getRoutingKey().toString());
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(""));
- dest.setSubject("");
- }
- }
- }
-
+ @Override
void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
{
Link link = dest.getLink();
@@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
bindingArguments);
}
- public void setLegacyFieldsForQueueType(AMQDestination dest)
- {
- // legacy support
- dest.setQueueName(new AMQShortString(dest.getAddressName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
- }
-
- public void setLegacyFiledsForTopicType(AMQDestination dest)
- {
- // legacy support
- dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- Node node = dest.getNode();
- dest.setExchangeClass(node.getExchangeType() == null?
- AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS):
- new AMQShortString(node.getExchangeType()));
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- }
-
protected void acknowledgeImpl()
{
RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
@@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+ @Override
+ protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
{
Node node = dest.getNode();
Map<String,Object> arguments = node.getDeclareArgs();
@@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
+ @Override
void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
{
Node node = dest.getNode();
@@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
- void handleLinkCreation(AMQDestination dest) throws AMQException
- {
- createBindings(dest, dest.getLink().getBindings());
- }
-
- void createBindings(AMQDestination dest, List<Binding> bindings)
+ protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange)
{
- String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
- .getAddressName() : "amq.topic";
-
- String defaultQueueName = null;
- if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
- {
- defaultQueueName = dest.getQueueName();
- }
- else
- {
- defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
- }
-
- for (Binding binding: bindings)
- {
- String queue = binding.getQueue() == null?
- defaultQueueName: binding.getQueue();
-
- String exchange = binding.getExchange() == null ?
- defaultExchangeForBinding :
- binding.getExchange();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Binding queue : " + queue +
- " exchange: " + exchange +
- " using binding key " + binding.getBindingKey() +
- " with args " + Strings.printMap(binding.getArgs()));
- }
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
- }
+ getQpidSession().exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
}
void handleLinkDelete(AMQDestination dest) throws AMQException
@@ -1614,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ @Override
void handleNodeDelete(AMQDestination dest) throws AMQException
{
if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index dbbc300910..0145d15111 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -29,7 +29,9 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -48,10 +50,14 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
@@ -59,6 +65,7 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -170,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, final AMQDestination dest,
+ final AMQShortString exchangeName, final AMQDestination destination,
final boolean nowait) throws AMQException, FailoverException
{
- getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
- (getTicket(),queueName,exchangeName,routingKey,false,arguments).
- generateFrame(getChannelId()), QueueBindOkBody.class);
+ if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
+ (getTicket(), queueName, exchangeName, routingKey, false, arguments).
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+
+ }
+ else
+ {
+ // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
+ List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>();
+ bindings.addAll(destination.getNode().getBindings());
+
+ String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
+ destination.getAddressName(): "amq.topic";
+
+ for (AMQDestination.Binding binding: bindings)
+ {
+ // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+ // The null check below is a way to side step that issue while fixing QPID-4146
+ // Note this issue only affects producers.
+ if (binding.getQueue() == null && queueName == null)
+ {
+ continue;
+ }
+ String queue = binding.getQueue() == null?
+ queueName.asString(): binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchange :
+ binding.getExchange();
+
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ doBind(destination, binding, queue, exchange);
+ }
+ }
}
public void sendClose(long timeout) throws AMQException, FailoverException
@@ -230,9 +274,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
table.setObject(entry.getKey(), entry.getValue());
}
}
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
- AMQFrame queueDeclare = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ sendQueueDeclare(name, durable, exclusive, autoDelete, table, false);
}
public void sendRecover() throws AMQException, FailoverException
@@ -428,6 +470,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return (responseBody.getReplyCode() == 0);
}
+
+ protected boolean exchangeExists(final AMQShortString exchangeName)
+ throws AMQException
+ {
+ if(!getAMQConnection().getDelegate().supportsIsBound())
+ {
+ return false;
+ }
+
+ AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ return sendExchangeBound(exchangeName, null, null);
+
+ }
+ }, getAMQConnection()).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ // valid if no issues, or just no bindings
+ return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3);
+ }
+
private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName,
AMQShortString routingKey,
AMQShortString queueName) throws AMQException, FailoverException
@@ -444,6 +512,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
boolean nowait,
int tag) throws AMQException, FailoverException
{
+ queueName = preprocessAddressTopic(consumer, queueName);
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
@@ -468,6 +537,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
@Override
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
+ {
+ final Link link = dest.getLink();
+ final String queueName ;
+
+ if (dest.getQueueName() == null)
+ {
+ queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
+ dest.setQueueName(new AMQShortString(queueName));
+ }
+ else
+ {
+ queueName = dest.getQueueName();
+ }
+
+ final Link.SubscriptionQueue queueProps = link.getSubscriptionQueue();
+ final Map<String,Object> arguments = queueProps.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+
+ if (link.isDurable() && queueName.startsWith("TempQueue"))
+ {
+ throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link.");
+ }
+
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ // not setting alternate exchange
+ sendQueueDeclare(AMQShortString.valueOf(queueName),
+ link.isDurable(),
+ queueProps.isExclusive(),
+ queueProps.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments),
+ false);
+
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+
+ Map<String,Object> bindingArguments = new HashMap<String, Object>();
+ bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+
+ final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments);
+ doBind(dest, binding, queueName, dest.getAddressName());
+
+ }
+
+ @Override
public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
@@ -481,17 +605,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
+ @Override
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException
+ {
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getTicket(),
+ name,
+ type,
+ passive || name.toString().startsWith("amq."),
+ durable,
+ autoDelete,
+ false,
+ false,
+ arguments);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
+ public void sendExchangeDelete(final String name) throws AMQException, FailoverException
+ {
+ ExchangeDeleteBody body =
+ getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
+ AMQShortString queueName = amqd.getAMQQueueName();
+ boolean durable = amqd.isDurable();
+ boolean exclusive = amqd.isExclusive();
+ boolean autoDelete = amqd.isAutoDelete();
+ FieldTable arguments = null;
+ sendQueueDeclare(queueName, durable, exclusive, autoDelete, arguments, passive);
+ }
+
+ private void sendQueueDeclare(final AMQShortString queueName,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete, final FieldTable arguments, final boolean passive)
+ throws AMQException, FailoverException
+ {
QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),
- amqd.getAMQQueueName(),
+ queueName,
passive,
- amqd.isDurable(),
- amqd.isExclusive(),
- amqd.isAutoDelete(),
+ durable,
+ exclusive,
+ autoDelete,
false,
- null);
+ arguments);
AMQFrame queueDeclare = body.generateFrame(getChannelId());
@@ -699,18 +867,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
{
- AMQFrame queueDeclare =
- getMethodRegistry().createQueueDeclareBody(getTicket(),
- amqd.getAMQQueueName(),
- true,
- amqd.isDurable(),
- amqd.isExclusive(),
- amqd.isAutoDelete(),
- false,
- null).generateFrame(getChannelId());
- QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
- return okHandler.getMessageCount();
+ if(isBound(null, amqd.getAMQQueueName(), null))
+ {
+ AMQFrame queueDeclare =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ true,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null).generateFrame(getChannelId());
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ return okHandler.getMessageCount();
+ }
+ else
+ {
+ return 0l;
+ }
}
protected boolean tagLE(long tag1, long tag2)
@@ -733,14 +908,387 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
- public void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException
+ @Override
+ public void resolveAddress(final AMQDestination dest, final boolean isConsumer, final boolean noLocal)
+ throws AMQException
+ {
+ if(!isAddrSyntaxSupported())
+ {
+ throw new UnsupportedAddressSyntaxException(dest);
+ }
+ super.resolveAddress(dest, isConsumer, noLocal);
+ }
+
+ private boolean isAddrSyntaxSupported()
+ {
+ return ((AMQConnectionDelegate_8_0)(getAMQConnection().getDelegate())).isAddrSyntaxSupported();
+ }
+
+ public int resolveAddressType(AMQDestination dest) throws AMQException
+ {
+ int type = dest.getAddressType();
+ String name = dest.getAddressName();
+ if (type != AMQDestination.UNKNOWN_TYPE)
+ {
+ return type;
+ }
+ else
+ {
+ boolean isExchange = exchangeExists(AMQShortString.valueOf(name));
+ boolean isQueue = isBound(null,AMQShortString.valueOf(name), null);
+
+ if (!isExchange && !isQueue)
+ {
+ type = dest instanceof AMQTopic ? AMQDestination.TOPIC_TYPE : AMQDestination.QUEUE_TYPE;
+ }
+ else if (!isExchange)
+ {
+ //name refers to a queue
+ type = AMQDestination.QUEUE_TYPE;
+ }
+ else if (!isQueue)
+ {
+ //name refers to an exchange
+ type = AMQDestination.TOPIC_TYPE;
+ }
+ else
+ {
+ //both a queue and exchange exist for that name
+ throw new AMQException("Ambiguous address, please specify queue or topic as node type");
+ }
+ dest.setAddressType(type);
+ return type;
+ }
+ }
+
+ protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws AMQException
+ {
+ final Node node = dest.getNode();
+ final Map<String,Object> arguments = node.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+ String altExchange = node.getAlternateExchange();
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
+
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()),
+ node.isDurable(),
+ node.isExclusive(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments),
+ false);
+
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
+ {
+ Node node = dest.getNode();
+ String altExchange = dest.getNode().getAlternateExchange();
+ Map<String, Object> arguments = node.getDeclareArgs();
+
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
+
+ // can't set alt. exchange
+ declareExchange(AMQShortString.valueOf(dest.getAddressName()),
+ AMQShortString.valueOf(node.getExchangeType()),
+ false,
+ node.isDurable(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments), false);
+
+ // If bindings are specified without a queue name and is called by the producer,
+ // the broker will send an exception as expected.
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+
+ protected void doBind(final AMQDestination dest,
+ final AMQDestination.Binding binding,
+ final String queue,
+ final String exchange) throws AMQException
{
- throw new UnsupportedAddressSyntaxException(dest);
+ final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ QueueBindBody queueBindBody =
+ methodRegistry.createQueueBindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ false,
+ FieldTable.convertToFieldTable(binding.getArgs()));
+
+ getProtocolHandler().syncWrite(queueBindBody.
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+ return null;
+ }
+ }, getAMQConnection()).execute();
+
}
+ protected void doUnbind(final AMQDestination.Binding binding,
+ final String queue,
+ final String exchange) throws AMQException
+ {
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ if (isBound(null, AMQShortString.valueOf(queue), null))
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ AMQMethodBody body;
+ if (methodRegistry instanceof MethodRegistry_0_9)
+ {
+ String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry;
+ body = methodRegistry_0_9.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ null);
+ }
+ else if (methodRegistry instanceof MethodRegistry_0_91)
+ {
+ MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry;
+ body = methodRegistry_0_91.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(binding.getBindingKey()),
+ null);
+
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
+ }
+ getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class);
+ return null;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }, getAMQConnection()).execute();
+ }
+
+ public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
+ {
+ Node node = dest.getNode();
+ return isQueueExist(dest.getAddressName(), assertNode,
+ node.isDurable(), node.isAutoDelete(),
+ node.isExclusive(), node.getDeclareArgs());
+ }
+
+ public boolean isQueueExist(final String queueName, boolean assertNode,
+ final boolean durable, final boolean autoDelete,
+ final boolean exclusive, final Map<String, Object> args) throws AMQException
+ {
+ boolean match = isBound(null,AMQShortString.valueOf(queueName), null);
+
+ if (assertNode)
+ {
+ if(!match)
+ {
+ throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." );
+
+ }
+ else
+ {
+
+ new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDeclare(AMQShortString.valueOf(queueName),
+ durable,
+ exclusive,
+ autoDelete,
+ FieldTable.convertToFieldTable(args),
+ true);
+
+ return null;
+ }
+ }, getAMQConnection());
+
+ }
+ }
+
+
+ return match;
+ }
+
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
+ {
+ boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName()));
+
+ Node node = dest.getNode();
+
+ if (match)
+ {
+ if (assertNode)
+ {
+
+ declareExchange(AMQShortString.valueOf(dest.getAddressName()),
+ AMQShortString.valueOf(node.getExchangeType()),
+ false,
+ node.isDurable(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(node.getDeclareArgs()), true);
+
+ }
+ else
+ {
+ // TODO - some way to determine the exchange type
+ /*
+ _logger.debug("Setting Exchange type " + result.getType());
+ node.setExchangeType(result.getType());
+ dest.setExchangeClass(new AMQShortString(result.getType()));
+ */
+
+ }
+ }
+
+ if (assertNode)
+ {
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +". Exchange not found.");
+ }
+ }
+
+ return match;
+ }
+
+ @Override
+ void handleNodeDelete(final AMQDestination dest) throws AMQException
+ {
+ if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+ {
+ if (isExchangeExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendExchangeDelete(dest.getAddressName());
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ else
+ {
+ if (isQueueExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendQueueDelete(AMQShortString.valueOf(dest.getAddressName()));
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ }
+
+ @Override
+ void handleLinkDelete(AMQDestination dest) throws AMQException
+ {
+ // We need to destroy link bindings
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (AMQDestination.Binding binding: dest.getLink().getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unbinding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ doUnbind(binding, queue, exchange);
+ }
+ }
+
+
+ void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException
+ {
+ // We need to delete the subscription queue.
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getSubscriptionQueue().isExclusive() &&
+ isQueueExist(dest.getQueueName(), false, false, false, false, null))
+ {
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDelete(AMQShortString.valueOf(dest.getQueueName()));
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+ }
+ }
+
protected void flushAcknowledgments()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 01e89b78c1..187be8522c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,19 +20,35 @@
*/
package org.apache.qpid.client;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.transport.TransportException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
public boolean isExclusive()
{
- return _exclusive;
+
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
}
public boolean isReceiving()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 658fb25ce4..8f91a7db08 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -17,12 +17,18 @@
*/
package org.apache.qpid.client;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* This is a 0.10 message consumer.
*/
@@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
clearReceiveQueue();
}
}
-
- public boolean isExclusive()
- {
- AMQDestination dest = this.getDestination();
- if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
- {
- return true;
- }
- else
- {
- return dest.getLink().getSubscription().isExclusive();
- }
- }
- else
- {
- return super.isExclusive();
- }
- }
+
void postSubscription() throws AMQException
{
@@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
+ getSession().handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
- ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+ getSession().handleLinkDelete(dest);
if (!isDurableSubscriber())
{
((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
@@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return capacity;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index f735895c81..cdffc73932 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +41,6 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ConnectionURL;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
private final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -71,6 +71,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
_topicDestinationCache = session.getTopicDestinationCache();
_queueDestinationCache = session.getQueueDestinationCache();
+
+ // This is due to the Destination carrying the temporary subscription name which is incorrect.
+ if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
+ {
+ boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
+
+ if (!namedQueue)
+ {
+ setDestination(destination.copyDestination());
+ getDestination().setQueueName(null);
+ }
+ }
+
if (destination.getRejectBehaviour() != null)
{
_rejectBehaviour = destination.getRejectBehaviour();
@@ -105,13 +118,33 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
-
+ postSubscription();
+ getSession().sync();
if (_logger.isDebugEnabled())
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
}
+ void postSubscription() throws AMQException
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.RECEIVER )
+ {
+ getSession().handleNodeDelete(dest);
+ }
+ // Subscription queue is handled as part of linkDelete method.
+ getSession().handleLinkDelete(dest);
+ if (!isDurableSubscriber())
+ {
+ getSession().deleteSubscriptionQueue(dest);
+ }
+ }
+ }
+
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 33bafe8f20..1d47ce9a07 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.util.UUID;
+
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -32,13 +33,15 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
+
+import org.slf4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
-import org.slf4j.Logger;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -286,6 +289,31 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
{
setClosed();
_session.deregisterProducer(_producerId);
+ AMQDestination dest = getAMQDestination();
+ AMQSession ssn = getSession();
+ if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ try
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.SENDER )
+ {
+ ssn.handleNodeDelete(dest);
+ }
+ ssn.handleLinkDelete(dest);
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+ ex.setLinkedException(e);
+ ex.initCause(e);
+ throw ex;
+ }
+ }
}
public void send(Message message) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index eb8104b02c..06a3b08272 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -48,7 +47,6 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
@@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
try
{
getSession().resolveAddress(destination,false,false);
- ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
- ((AMQSession_0_10)getSession()).sync();
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
}
catch(Exception e)
{
@@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
public void close() throws JMSException
{
super.close();
- AMQDestination dest = getAMQDestination();
- AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
- if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- try
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
- {
- ssn.handleNodeDelete(dest);
- }
- ssn.handleLinkDelete(dest);
- }
- catch(TransportException e)
- {
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
- }
- catch (AMQException e)
- {
- JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
- ex.setLinkedException(e);
- ex.initCause(e);
- throw ex;
- }
- }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 355c456249..e1b399e10a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.util.GZIPUtils;
@@ -57,30 +60,37 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory);
}
- void declareDestination(AMQDestination destination)
+ void declareDestination(AMQDestination destination) throws AMQException
{
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
- throw new UnsupportedAddressSyntaxException(destination);
- }
+ getSession().resolveAddress(destination, false, false);
- if(getSession().isDeclareExchanges())
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
+ }
+ else
{
- final MethodRegistry methodRegistry = getSession().getMethodRegistry();
- ExchangeDeclareBody body =
- methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- destination.getExchangeName().toString().startsWith("amq."),
- destination.isExchangeDurable(),
- destination.isExchangeAutoDelete(),
- destination.isExchangeInternal(),
- true,
- null);
- AMQFrame declare = body.generateFrame(getChannelId());
-
- getConnection().getProtocolHandler().writeFrame(declare);
+ if (getSession().isDeclareExchanges())
+ {
+ final MethodRegistry methodRegistry = getSession().getMethodRegistry();
+ ExchangeDeclareBody body =
+ methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ destination.getExchangeName()
+ .toString()
+ .startsWith("amq."),
+ destination.isExchangeDurable(),
+ destination.isExchangeAutoDelete(),
+ destination.isExchangeInternal(),
+ true,
+ null);
+ AMQFrame declare = body.generateFrame(getChannelId());
+
+ getConnection().getProtocolHandler().writeFrame(declare);
+ }
}
}
@@ -88,18 +98,43 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
+
+
+
+ AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+ BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
+
+ AMQShortString routingKey = destination.getRoutingKey();
+
+ FieldTable headers = delegate.getContentHeaderProperties().getHeaders();
+
+ if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+ (destination.getSubject() != null
+ || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null)))
+ {
+
+ if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null)
+ {
+ // use default subject in address string
+ headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject());
+ }
+
+ if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT));
+ }
+ }
+
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
+ destination.getExchangeName(),
+ routingKey,
+ mandatory,
+ immediate);
AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
- AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
- BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
contentHeaderProperties.setUserId(getUserID());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index ad9a37479e..bd089eb6a8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -21,6 +21,23 @@
package org.apache.qpid.client.message;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +45,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
@@ -42,22 +58,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
import org.apache.qpid.transport.TransportException;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotWriteableException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
/**
* This extends AbstractAMQMessageDelegate which contains common code between
* both the 0_8 and 0_10 Message types.
@@ -352,14 +352,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
try
{
- int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
+ int type = getAMQSession().resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd);
+ getAMQSession().setLegacyFieldsForQueueType(amqd);
}
else
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
+ getAMQSession().setLegacyFieldsForTopicType(amqd);
}
}
catch(AMQException ex)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 21f1623dd1..747668ff9c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T>
{
_waiting.set(true);
- while (!_ready)
+ while (!_ready && _error == null)
{
try
{