diff options
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java')
-rw-r--r-- | java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 2fff1856c7..f7867d6178 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -66,6 +66,7 @@ import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.consumer.Consumer; @@ -95,7 +96,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>(); private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>(); private Runnable _closeAction; - private final AMQQueue _queue; + private final MessageSource _queue; public SendingLink_1_0(final SendingLinkAttachment linkAttachment, @@ -121,7 +122,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _queue = ((QueueDestination) _destination).getQueue(); - if(_queue.getAvailableAttributes().contains("topic")) + if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic")) { source.setDistributionMode(StdDistMode.COPY); } @@ -217,7 +218,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - _queue = _vhost.createQueue( + queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(name, _vhost.getName()), name, isDurable, @@ -229,8 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } else { - _queue = queue; - List<Binding> bindings = _queue.getBindings(); + List<Binding> bindings = queue.getBindings(); List<Binding> bindingsToRemove = new ArrayList<Binding>(); for(Binding existingBinding : bindings) { @@ -313,15 +313,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } } } + _queue = queue; source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - exchange.addBinding(binding, _queue,null); + exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); if(!isDurable) { final String queueName = name; - final AMQQueue tempQueue = _queue; + final AMQQueue tempQueue = queue; final Action<Connection_1_0> deleteQueueTask = new Action<Connection_1_0>() @@ -345,7 +346,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - _queue.addQueueDeleteTask(new Action<AMQQueue>() + queue.addQueueDeleteTask(new Action<AMQQueue>() { public void performAction(AMQQueue queue) { @@ -356,7 +357,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS }); } - qd = new QueueDestination(_queue); + qd = new QueueDestination(queue); } catch (AMQSecurityException e) { @@ -454,7 +455,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { try { - _queue.getVirtualHost().removeQueue(_queue); + _vhost.removeQueue((AMQQueue)_queue); } catch(AMQException e) { |