summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-10 12:41:39 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-10 12:41:39 +0000
commit865c0a4a39044ed9a1e7d37790c516e28761dd4c (patch)
tree0f1c39ed399ff74eae94b17613ea81bf9fdb362f
parent47471d475f9e8109d30c38d885f683a1ae86ce12 (diff)
downloadqpid-python-865c0a4a39044ed9a1e7d37790c516e28761dd4c.tar.gz
Update to client to allow for request/response routing
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1566595 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java31
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java4
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java17
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java6
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java20
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java8
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java4
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java20
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java5
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java92
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java23
-rw-r--r--java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java6
12 files changed, 173 insertions, 63 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
index 924c5b9857..9f39d2c94f 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.Topic;
import javax.jms.JMSException;
+import java.util.UUID;
import java.util.WeakHashMap;
public class DestinationImpl implements Destination, Queue, Topic
@@ -32,6 +33,7 @@ public class DestinationImpl implements Destination, Queue, Topic
new WeakHashMap<String, DestinationImpl>();
private final String _address;
+ private String _localTerminus;
protected DestinationImpl(String address)
{
@@ -62,13 +64,24 @@ public class DestinationImpl implements Destination, Queue, Topic
&& _address.equals(((DestinationImpl)obj)._address);
}
- public static synchronized DestinationImpl createDestination(final String address)
+ public static synchronized DestinationImpl createDestination(String address)
{
- DestinationImpl destination = DESTINATION_CACHE.get(address);
- if(destination == null)
+ DestinationImpl destination;
+ if (address.endsWith("!!"))
{
+ address = address.substring(0, address.length() - 2);
+ String localTerminusName = UUID.randomUUID().toString();
destination = new DestinationImpl(address);
- DESTINATION_CACHE.put(address, destination);
+ destination.setLocalTerminus(localTerminusName);
+ }
+ else
+ {
+ destination = DESTINATION_CACHE.get(address);
+ if (destination == null)
+ {
+ destination = new DestinationImpl(address);
+ DESTINATION_CACHE.put(address, destination);
+ }
}
return destination;
}
@@ -82,4 +95,14 @@ public class DestinationImpl implements Destination, Queue, Topic
{
return getAddress();
}
+
+ void setLocalTerminus(final String localTerminus)
+ {
+ _localTerminus = localTerminus;
+ }
+
+ String getLocalTerminus()
+ {
+ return _localTerminus;
+ }
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index 96ee1e984d..d7bb546d7a 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
@@ -159,7 +160,8 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
{
try
{
- return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
+ String targetAddr = _destination.getLocalTerminus() != null ? _destination.getLocalTerminus() : UUID.randomUUID().toString();
+ return _session.getClientSession().createReceiver(_session.toAddress(_destination), targetAddr, AcknowledgeMode.ALO,
_linkName, _durable, getFilters(), null);
}
catch (ConnectionErrorException e)
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
index fed9b5904f..f2d0cb5b18 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
@@ -75,6 +75,7 @@ public abstract class MessageImpl implements Message
private boolean _isFromQueue;
private boolean _isFromTopic;
private long _expiration;
+ private DestinationImpl _replyTo;
protected MessageImpl(Header header,
MessageAnnotations messageAnnotations,
@@ -182,11 +183,12 @@ public abstract class MessageImpl implements Message
public DestinationImpl getJMSReplyTo() throws JMSException
{
- return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE)));
+ return _replyTo != null ? _replyTo : toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE)));
}
public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException
{
+ _replyTo = (DestinationImpl) destination;
if( destination==null )
{
setReplyTo(null);
@@ -194,9 +196,16 @@ public abstract class MessageImpl implements Message
}
else
{
- DecodedDestination dd = toDecodedDestination(destination);
- setReplyTo(dd.getAddress());
- messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes()));
+ if(_replyTo.getLocalTerminus() != null)
+ {
+ setReplyTo(_replyTo.getLocalTerminus());
+ }
+ else
+ {
+ DecodedDestination dd = toDecodedDestination(destination);
+ setReplyTo(dd.getAddress());
+ messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes()));
+ }
}
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index e459575974..b12540d597 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -77,7 +77,11 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
{
try
{
- _sender = _session.getClientSession().createSender(_session.toAddress(_destination), new Session.SourceConfigurator()
+ final String sourceName = _destination.getLocalTerminus() != null
+ ? _destination.getLocalTerminus()
+ : UUID.randomUUID().toString();
+
+ _sender = _session.getClientSession().createSender(sourceName, _session.toAddress(_destination), new Session.SourceConfigurator()
{
public void configureSource(final Source source)
{
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
index cb56843a72..589ee5204f 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
@@ -20,6 +20,7 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.Queue;
+import java.util.UUID;
import java.util.WeakHashMap;
public class QueueImpl extends DestinationImpl implements Queue
@@ -37,13 +38,24 @@ public class QueueImpl extends DestinationImpl implements Queue
return getAddress();
}
- public static synchronized QueueImpl createQueue(final String address)
+ public static synchronized QueueImpl createQueue(String address)
{
- QueueImpl queue = QUEUE_CACHE.get(address);
- if(queue == null)
+ QueueImpl queue;
+ if (address.endsWith("!!"))
{
+ address = address.substring(0, address.length() - 2);
+ String localTerminusName = UUID.randomUUID().toString();
queue = new QueueImpl(address);
- QUEUE_CACHE.put(address, queue);
+ queue.setLocalTerminus(localTerminusName);
+ }
+ else
+ {
+ queue = QUEUE_CACHE.get(address);
+ if(queue == null)
+ {
+ queue = new QueueImpl(address);
+ QUEUE_CACHE.put(address, queue);
+ }
}
return queue;
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
index 4e9e9d6a39..21161ff83f 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
@@ -24,6 +24,8 @@ import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import java.util.UUID;
+
public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
{
QueueReceiverImpl(final QueueImpl destination,
@@ -40,7 +42,11 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei
{
try
{
- return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
+ final String targetAddr =
+ getDestination().getLocalTerminus() != null ? getDestination().getLocalTerminus() : UUID
+ .randomUUID().toString();
+ return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()),
+ targetAddr);
}
catch (ConnectionErrorException e)
{
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index 2318b8ba9b..bd67ff681a 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -460,7 +460,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
{
checkClosed();
checkNotTopicSession();
- return new QueueImpl(s);
+ return QueueImpl.valueOf(s);
}
public QueueReceiver createReceiver(final Queue queue) throws JMSException
@@ -488,7 +488,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
{
checkClosed();
checkNotQueueSession();
- return new TopicImpl(s);
+ return TopicImpl.valueOf(s);
}
public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
index 5292944075..edc583e233 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
@@ -20,6 +20,7 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.Topic;
+import java.util.UUID;
import java.util.WeakHashMap;
public class TopicImpl extends DestinationImpl implements Topic
@@ -38,13 +39,24 @@ public class TopicImpl extends DestinationImpl implements Topic
return getAddress();
}
- public static synchronized TopicImpl createTopic(final String address)
+ public static synchronized TopicImpl createTopic(String address)
{
- TopicImpl topic = TOPIC_CACHE.get(address);
- if(topic == null)
+ TopicImpl topic;
+ if (address.endsWith("!!"))
{
+ address = address.substring(0, address.length() - 2);
+ String localTerminusName = UUID.randomUUID().toString();
topic = new TopicImpl(address);
- TOPIC_CACHE.put(address, topic);
+ topic.setLocalTerminus(localTerminusName);
+ }
+ else
+ {
+ topic = TOPIC_CACHE.get(address);
+ if(topic == null)
+ {
+ topic = new TopicImpl(address);
+ TOPIC_CACHE.put(address, topic);
+ }
}
return topic;
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
index 69e07f30a1..b89025a27b 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
@@ -19,6 +19,7 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import java.util.Map;
+import java.util.UUID;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
@@ -67,7 +68,9 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
try
{
String address = getSession().toAddress(getDestination());
- Receiver receiver = getSession().getClientSession().createReceiver(address,
+ String targetAddress = getDestination().getLocalTerminus() != null ? getDestination().getLocalTerminus() : UUID.randomUUID().toString();
+
+ Receiver receiver = getSession().getClientSession().createReceiver(address, targetAddress,
StdDistMode.COPY, AcknowledgeMode.ALO,
getLinkName(), isDurable(), getFilters(),
null);
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
index cac4775b54..290895df60 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
@@ -78,8 +78,14 @@ public class Session
public Sender createSender(final String targetName, final SourceConfigurator configurator)
throws Sender.SenderCreationException, ConnectionClosedException
{
-
final String sourceName = UUID.randomUUID().toString();
+ return createSender(sourceName, targetName, configurator);
+ }
+
+ public Sender createSender(final String sourceName, final String targetName, final SourceConfigurator configurator)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+
return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false)
{
@Override
@@ -150,93 +156,118 @@ public class Session
public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
{
- return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
+ return createReceiver(sourceAddr, UUID.randomUUID().toString(), null, AcknowledgeMode.ALO);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
throws ConnectionErrorException
{
- return createReceiver(queue, null, mode);
+ return createReceiver(queue, UUID.randomUUID().toString(), null, mode);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
throws ConnectionErrorException
{
- return createReceiver(queue, null, mode, linkName);
+ return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
throws ConnectionErrorException
{
- return createReceiver(queue, null, mode, linkName, isDurable);
+ return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
throws ConnectionErrorException
{
- return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
+ return createReceiver(queue, (DistributionMode) null, mode, linkName, isDurable, filters, unsettled);
}
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
+ public Receiver createReceiver(final String queue, String targetName, final AcknowledgeMode mode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
throws ConnectionErrorException
{
- return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
+ return createReceiver(queue, targetName, null, mode, linkName, isDurable, filters, unsettled);
}
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
+ boolean isDurable, Map<Binary, Outcome> unsettled)
throws ConnectionErrorException
{
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
+ return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable, unsettled);
}
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+
+ private synchronized Receiver createReceiver(final String sourceAddr,
+ final String targetAddr,
+ DistributionMode mode)
throws ConnectionErrorException
{
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
+ return createReceiver(sourceAddr, targetAddr, mode, AcknowledgeMode.ALO);
}
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode)
+ private synchronized Receiver createReceiver(final String sourceAddr,
+ final String targetAddr,
+ DistributionMode mode,
+ final AcknowledgeMode ackMode)
throws ConnectionErrorException
{
- return createReceiver(sourceAddr, mode, ackMode, null);
+ return createReceiver(sourceAddr, targetAddr, mode, ackMode, null);
}
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName)
+ private synchronized Receiver createReceiver(final String sourceAddr,
+ final String targetAddr,
+ DistributionMode mode,
+ final AcknowledgeMode ackMode,
+ String linkName)
throws ConnectionErrorException
{
- return createReceiver(sourceAddr,mode, ackMode, linkName, false);
+ return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, false);
}
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+ private synchronized Receiver createReceiver(final String sourceAddr,
+ final String targetAddr,
+ DistributionMode mode,
+ final AcknowledgeMode ackMode,
+ String linkName,
+ boolean isDurable)
throws ConnectionErrorException
{
- return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
+ return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, isDurable, null);
}
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Binary, Outcome> unsettled)
+ private synchronized Receiver createReceiver(final String sourceAddr,
+ final String targetAddr,
+ DistributionMode mode,
+ final AcknowledgeMode ackMode,
+ String linkName,
+ boolean isDurable,
+ Map<Binary, Outcome> unsettled)
throws ConnectionErrorException
{
return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
}
public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, UUID.randomUUID().toString(), mode, ackMode, linkName, isDurable, filters, unsettled);
+ }
+
+ public synchronized Receiver createReceiver(final String sourceAddr, String targetAddr, DistributionMode mode,
final AcknowledgeMode ackMode, String linkName, boolean isDurable,
Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
throws ConnectionErrorException
{
final Target target = new Target();
+ target.setAddress(targetAddr);
final Source source = new Source();
source.setAddress(sourceAddr);
source.setDistributionMode(mode);
@@ -258,12 +289,17 @@ public class Session
public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
{
- return createReceiver(sourceAddr, StdDistMode.COPY);
+ return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.COPY);
}
public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
{
- return createReceiver(sourceAddr, StdDistMode.MOVE);
+ return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE);
+ }
+
+ public synchronized Receiver createMovingReceiver(final String sourceAddr, final String targetAddr) throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE);
}
public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 4ca1c7dac0..f796a4b2e3 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -43,16 +43,7 @@ import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
@@ -393,9 +384,19 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
try
{
+ final String name;
+ if(getEndpoint().getTarget() instanceof Target)
+ {
+ Target target = (Target) getEndpoint().getTarget();
+ name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress();
+ }
+ else
+ {
+ name = getEndpoint().getName();
+ }
_consumer = _queue.addConsumer(_target,
messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class, getEndpoint().getName(), options);
+ Message_1_0.class, name, options);
}
catch (AMQException e)
{
diff --git a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 18795f58b8..1c1c72dd0b 100644
--- a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -336,11 +336,13 @@ class ManagementNode implements MessageSource<ManagementNodeConsumer,ManagementN
}
-
- for(ManagementNodeConsumer consumer : _consumers.values())
+ ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo());
+ if(consumer != null)
{
+ // TODO - check same owner
consumer.send(response);
}
+ // TODO - route to a queue
}