From 865c0a4a39044ed9a1e7d37790c516e28761dd4c Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 10 Feb 2014 12:41:39 +0000 Subject: Update to client to allow for request/response routing git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1566595 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/amqp_1_0/jms/impl/DestinationImpl.java | 31 +++++++- .../amqp_1_0/jms/impl/MessageConsumerImpl.java | 4 +- .../apache/qpid/amqp_1_0/jms/impl/MessageImpl.java | 17 +++- .../amqp_1_0/jms/impl/MessageProducerImpl.java | 6 +- .../apache/qpid/amqp_1_0/jms/impl/QueueImpl.java | 20 ++++- .../qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java | 8 +- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 4 +- .../apache/qpid/amqp_1_0/jms/impl/TopicImpl.java | 20 ++++- .../amqp_1_0/jms/impl/TopicSubscriberImpl.java | 5 +- .../org/apache/qpid/amqp_1_0/client/Session.java | 92 +++++++++++++++------- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 23 +++--- .../server/management/amqp/ManagementNode.java | 6 +- 12 files changed, 173 insertions(+), 63 deletions(-) diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java index 924c5b9857..9f39d2c94f 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java @@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.jms.Queue; import org.apache.qpid.amqp_1_0.jms.Topic; import javax.jms.JMSException; +import java.util.UUID; import java.util.WeakHashMap; public class DestinationImpl implements Destination, Queue, Topic @@ -32,6 +33,7 @@ public class DestinationImpl implements Destination, Queue, Topic new WeakHashMap(); private final String _address; + private String _localTerminus; protected DestinationImpl(String address) { @@ -62,13 +64,24 @@ public class DestinationImpl implements Destination, Queue, Topic && _address.equals(((DestinationImpl)obj)._address); } - public static synchronized DestinationImpl createDestination(final String address) + public static synchronized DestinationImpl createDestination(String address) { - DestinationImpl destination = DESTINATION_CACHE.get(address); - if(destination == null) + DestinationImpl destination; + if (address.endsWith("!!")) { + address = address.substring(0, address.length() - 2); + String localTerminusName = UUID.randomUUID().toString(); destination = new DestinationImpl(address); - DESTINATION_CACHE.put(address, destination); + destination.setLocalTerminus(localTerminusName); + } + else + { + destination = DESTINATION_CACHE.get(address); + if (destination == null) + { + destination = new DestinationImpl(address); + DESTINATION_CACHE.put(address, destination); + } } return destination; } @@ -82,4 +95,14 @@ public class DestinationImpl implements Destination, Queue, Topic { return getAddress(); } + + void setLocalTerminus(final String localTerminus) + { + _localTerminus = localTerminus; + } + + String getLocalTerminus() + { + return _localTerminus; + } } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index 96ee1e984d..d7bb546d7a 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; @@ -159,7 +160,8 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi { try { - return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO, + String targetAddr = _destination.getLocalTerminus() != null ? _destination.getLocalTerminus() : UUID.randomUUID().toString(); + return _session.getClientSession().createReceiver(_session.toAddress(_destination), targetAddr, AcknowledgeMode.ALO, _linkName, _durable, getFilters(), null); } catch (ConnectionErrorException e) diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java index fed9b5904f..f2d0cb5b18 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java @@ -75,6 +75,7 @@ public abstract class MessageImpl implements Message private boolean _isFromQueue; private boolean _isFromTopic; private long _expiration; + private DestinationImpl _replyTo; protected MessageImpl(Header header, MessageAnnotations messageAnnotations, @@ -182,11 +183,12 @@ public abstract class MessageImpl implements Message public DestinationImpl getJMSReplyTo() throws JMSException { - return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE))); + return _replyTo != null ? _replyTo : toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE))); } public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException { + _replyTo = (DestinationImpl) destination; if( destination==null ) { setReplyTo(null); @@ -194,9 +196,16 @@ public abstract class MessageImpl implements Message } else { - DecodedDestination dd = toDecodedDestination(destination); - setReplyTo(dd.getAddress()); - messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes())); + if(_replyTo.getLocalTerminus() != null) + { + setReplyTo(_replyTo.getLocalTerminus()); + } + else + { + DecodedDestination dd = toDecodedDestination(destination); + setReplyTo(dd.getAddress()); + messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes())); + } } } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index e459575974..b12540d597 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -77,7 +77,11 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP { try { - _sender = _session.getClientSession().createSender(_session.toAddress(_destination), new Session.SourceConfigurator() + final String sourceName = _destination.getLocalTerminus() != null + ? _destination.getLocalTerminus() + : UUID.randomUUID().toString(); + + _sender = _session.getClientSession().createSender(sourceName, _session.toAddress(_destination), new Session.SourceConfigurator() { public void configureSource(final Source source) { diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java index cb56843a72..589ee5204f 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java @@ -20,6 +20,7 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.jms.Queue; +import java.util.UUID; import java.util.WeakHashMap; public class QueueImpl extends DestinationImpl implements Queue @@ -37,13 +38,24 @@ public class QueueImpl extends DestinationImpl implements Queue return getAddress(); } - public static synchronized QueueImpl createQueue(final String address) + public static synchronized QueueImpl createQueue(String address) { - QueueImpl queue = QUEUE_CACHE.get(address); - if(queue == null) + QueueImpl queue; + if (address.endsWith("!!")) { + address = address.substring(0, address.length() - 2); + String localTerminusName = UUID.randomUUID().toString(); queue = new QueueImpl(address); - QUEUE_CACHE.put(address, queue); + queue.setLocalTerminus(localTerminusName); + } + else + { + queue = QUEUE_CACHE.get(address); + if(queue == null) + { + queue = new QueueImpl(address); + QUEUE_CACHE.put(address, queue); + } } return queue; } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java index 4e9e9d6a39..21161ff83f 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java @@ -24,6 +24,8 @@ import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.jms.Queue; import org.apache.qpid.amqp_1_0.jms.QueueReceiver; +import java.util.UUID; + public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver { QueueReceiverImpl(final QueueImpl destination, @@ -40,7 +42,11 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei { try { - return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination())); + final String targetAddr = + getDestination().getLocalTerminus() != null ? getDestination().getLocalTerminus() : UUID + .randomUUID().toString(); + return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()), + targetAddr); } catch (ConnectionErrorException e) { diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index 2318b8ba9b..bd67ff681a 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -460,7 +460,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession { checkClosed(); checkNotTopicSession(); - return new QueueImpl(s); + return QueueImpl.valueOf(s); } public QueueReceiver createReceiver(final Queue queue) throws JMSException @@ -488,7 +488,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession { checkClosed(); checkNotQueueSession(); - return new TopicImpl(s); + return TopicImpl.valueOf(s); } public TopicSubscriber createSubscriber(final Topic topic) throws JMSException diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java index 5292944075..edc583e233 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java @@ -20,6 +20,7 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.jms.Topic; +import java.util.UUID; import java.util.WeakHashMap; public class TopicImpl extends DestinationImpl implements Topic @@ -38,13 +39,24 @@ public class TopicImpl extends DestinationImpl implements Topic return getAddress(); } - public static synchronized TopicImpl createTopic(final String address) + public static synchronized TopicImpl createTopic(String address) { - TopicImpl topic = TOPIC_CACHE.get(address); - if(topic == null) + TopicImpl topic; + if (address.endsWith("!!")) { + address = address.substring(0, address.length() - 2); + String localTerminusName = UUID.randomUUID().toString(); topic = new TopicImpl(address); - TOPIC_CACHE.put(address, topic); + topic.setLocalTerminus(localTerminusName); + } + else + { + topic = TOPIC_CACHE.get(address); + if(topic == null) + { + topic = new TopicImpl(address); + TOPIC_CACHE.put(address, topic); + } } return topic; } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index 69e07f30a1..b89025a27b 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -19,6 +19,7 @@ package org.apache.qpid.amqp_1_0.jms.impl; import java.util.Map; +import java.util.UUID; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.qpid.amqp_1_0.client.AcknowledgeMode; @@ -67,7 +68,9 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub try { String address = getSession().toAddress(getDestination()); - Receiver receiver = getSession().getClientSession().createReceiver(address, + String targetAddress = getDestination().getLocalTerminus() != null ? getDestination().getLocalTerminus() : UUID.randomUUID().toString(); + + Receiver receiver = getSession().getClientSession().createReceiver(address, targetAddress, StdDistMode.COPY, AcknowledgeMode.ALO, getLinkName(), isDurable(), getFilters(), null); diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index cac4775b54..290895df60 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -78,8 +78,14 @@ public class Session public Sender createSender(final String targetName, final SourceConfigurator configurator) throws Sender.SenderCreationException, ConnectionClosedException { - final String sourceName = UUID.randomUUID().toString(); + return createSender(sourceName, targetName, configurator); + } + + public Sender createSender(final String sourceName, final String targetName, final SourceConfigurator configurator) + throws Sender.SenderCreationException, ConnectionClosedException + { + return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false) { @Override @@ -150,93 +156,118 @@ public class Session public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException { - return createReceiver(sourceAddr, null, AcknowledgeMode.ALO); + return createReceiver(sourceAddr, UUID.randomUUID().toString(), null, AcknowledgeMode.ALO); } public Receiver createReceiver(final String queue, final AcknowledgeMode mode) throws ConnectionErrorException { - return createReceiver(queue, null, mode); + return createReceiver(queue, UUID.randomUUID().toString(), null, mode); } public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName) throws ConnectionErrorException { - return createReceiver(queue, null, mode, linkName); + return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName); } public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable) throws ConnectionErrorException { - return createReceiver(queue, null, mode, linkName, isDurable); + return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable); } public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable, Map filters, Map unsettled) throws ConnectionErrorException { - return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled); + return createReceiver(queue, (DistributionMode) null, mode, linkName, isDurable, filters, unsettled); } - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, - boolean isDurable, Map unsettled) + public Receiver createReceiver(final String queue, String targetName, final AcknowledgeMode mode, String linkName, boolean isDurable, + Map filters, Map unsettled) throws ConnectionErrorException { - return createReceiver(queue, null, mode, linkName, isDurable, unsettled); + return createReceiver(queue, targetName, null, mode, linkName, isDurable, filters, unsettled); } - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode) + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, + boolean isDurable, Map unsettled) throws ConnectionErrorException { - return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO); + return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable, unsettled); } - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName) + + private synchronized Receiver createReceiver(final String sourceAddr, + final String targetAddr, + DistributionMode mode) throws ConnectionErrorException { - return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName); + return createReceiver(sourceAddr, targetAddr, mode, AcknowledgeMode.ALO); } - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode) + private synchronized Receiver createReceiver(final String sourceAddr, + final String targetAddr, + DistributionMode mode, + final AcknowledgeMode ackMode) throws ConnectionErrorException { - return createReceiver(sourceAddr, mode, ackMode, null); + return createReceiver(sourceAddr, targetAddr, mode, ackMode, null); } - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName) + private synchronized Receiver createReceiver(final String sourceAddr, + final String targetAddr, + DistributionMode mode, + final AcknowledgeMode ackMode, + String linkName) throws ConnectionErrorException { - return createReceiver(sourceAddr,mode, ackMode, linkName, false); + return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, false); } - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName, boolean isDurable) + private synchronized Receiver createReceiver(final String sourceAddr, + final String targetAddr, + DistributionMode mode, + final AcknowledgeMode ackMode, + String linkName, + boolean isDurable) throws ConnectionErrorException { - return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null); + return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, isDurable, null); } - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName, boolean isDurable, - Map unsettled) + private synchronized Receiver createReceiver(final String sourceAddr, + final String targetAddr, + DistributionMode mode, + final AcknowledgeMode ackMode, + String linkName, + boolean isDurable, + Map unsettled) throws ConnectionErrorException { return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled); } public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName, boolean isDurable, + Map filters, Map unsettled) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, UUID.randomUUID().toString(), mode, ackMode, linkName, isDurable, filters, unsettled); + } + + public synchronized Receiver createReceiver(final String sourceAddr, String targetAddr, DistributionMode mode, final AcknowledgeMode ackMode, String linkName, boolean isDurable, Map filters, Map unsettled) throws ConnectionErrorException { final Target target = new Target(); + target.setAddress(targetAddr); final Source source = new Source(); source.setAddress(sourceAddr); source.setDistributionMode(mode); @@ -258,12 +289,17 @@ public class Session public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException { - return createReceiver(sourceAddr, StdDistMode.COPY); + return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.COPY); } public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException { - return createReceiver(sourceAddr, StdDistMode.MOVE); + return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE); + } + + public synchronized Receiver createMovingReceiver(final String sourceAddr, final String targetAddr) throws ConnectionErrorException + { + return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE); } public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 4ca1c7dac0..f796a4b2e3 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -43,16 +43,7 @@ import org.apache.qpid.amqp_1_0.type.DeliveryState; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Modified; -import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; +import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.Error; @@ -393,9 +384,19 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS try { + final String name; + if(getEndpoint().getTarget() instanceof Target) + { + Target target = (Target) getEndpoint().getTarget(); + name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress(); + } + else + { + name = getEndpoint().getName(); + } _consumer = _queue.addConsumer(_target, messageFilter == null ? null : new SimpleFilterManager(messageFilter), - Message_1_0.class, getEndpoint().getName(), options); + Message_1_0.class, name, options); } catch (AMQException e) { diff --git a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 18795f58b8..1c1c72dd0b 100644 --- a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -336,11 +336,13 @@ class ManagementNode implements MessageSource