summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java')
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java21
1 files changed, 11 insertions, 10 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 2fff1856c7..f7867d6178 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -66,6 +66,7 @@ import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.consumer.Consumer;
@@ -95,7 +96,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
private Runnable _closeAction;
- private final AMQQueue _queue;
+ private final MessageSource _queue;
public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
@@ -121,7 +122,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
_queue = ((QueueDestination) _destination).getQueue();
- if(_queue.getAvailableAttributes().contains("topic"))
+ if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
@@ -217,7 +218,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(queue == null)
{
- _queue = _vhost.createQueue(
+ queue = _vhost.createQueue(
UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
name,
isDurable,
@@ -229,8 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
else
{
- _queue = queue;
- List<Binding> bindings = _queue.getBindings();
+ List<Binding> bindings = queue.getBindings();
List<Binding> bindingsToRemove = new ArrayList<Binding>();
for(Binding existingBinding : bindings)
{
@@ -313,15 +313,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
}
}
+ _queue = queue;
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- exchange.addBinding(binding, _queue,null);
+ exchange.addBinding(binding, queue,null);
source.setDistributionMode(StdDistMode.COPY);
if(!isDurable)
{
final String queueName = name;
- final AMQQueue tempQueue = _queue;
+ final AMQQueue tempQueue = queue;
final Action<Connection_1_0> deleteQueueTask =
new Action<Connection_1_0>()
@@ -345,7 +346,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
- _queue.addQueueDeleteTask(new Action<AMQQueue>()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
public void performAction(AMQQueue queue)
{
@@ -356,7 +357,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
});
}
- qd = new QueueDestination(_queue);
+ qd = new QueueDestination(queue);
}
catch (AMQSecurityException e)
{
@@ -454,7 +455,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
try
{
- _queue.getVirtualHost().removeQueue(_queue);
+ _vhost.removeQueue((AMQQueue)_queue);
}
catch(AMQException e)
{