diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java | 39 |
1 files changed, 17 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 6a959df440..bc670bd848 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; @@ -33,14 +34,17 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -66,7 +70,7 @@ public abstract class AbstractExchange implements Exchange private VirtualHost _virtualHost; - private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>(); /** * Whether the exchange is automatically deleted once all queues have detached from it @@ -138,6 +142,12 @@ public abstract class AbstractExchange implements Exchange if(_closed.compareAndSet(false,true)) { + List<Binding> bindings = new ArrayList<Binding>(_bindings); + for(Binding binding : bindings) + { + removeBinding(binding); + } + if(_alternateExchange != null) { _alternateExchange.removeReference(this); @@ -145,9 +155,9 @@ public abstract class AbstractExchange implements Exchange CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED()); - for(Task task : _closeTaskList) + for(Action<Exchange> task : _closeTaskList) { - task.onClose(this); + task.performAction(this); } _closeTaskList.clear(); } @@ -300,12 +310,12 @@ public abstract class AbstractExchange implements Exchange return !_referrers.isEmpty(); } - public void addCloseTask(final Task task) + public void addCloseTask(final Action<Exchange> task) { _closeTaskList.add(task); } - public void removeCloseTask(final Task task) + public void removeCloseTask(final Action<Exchange> task) { _closeTaskList.remove(task); } @@ -421,7 +431,7 @@ public abstract class AbstractExchange implements Exchange public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final BaseQueue.PostEnqueueAction postEnqueueAction) + final Action<MessageInstance<? extends Consumer>> postEnqueueAction) { List<? extends BaseQueue> queues = route(message, instanceProperties); @@ -579,8 +589,6 @@ public abstract class AbstractExchange implements Exchange { doRemoveBinding(b); queue.removeBinding(b); - removeCloseTask(b); - queue.removeQueueDeleteTask(b); if (b.isDurable()) { @@ -659,8 +667,6 @@ public abstract class AbstractExchange implements Exchange DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b); } - queue.addQueueDeleteTask(b); - addCloseTask(b); queue.addBinding(b); doAddBinding(b); b.logCreation(); @@ -673,7 +679,7 @@ public abstract class AbstractExchange implements Exchange } } - private final class BindingImpl extends Binding implements AMQQueue.Task, Task + private final class BindingImpl extends Binding { private final BindingLogSubject _logSubject; //TODO : persist creation time @@ -689,12 +695,6 @@ public abstract class AbstractExchange implements Exchange } - - public void doTask(final AMQQueue queue) throws AMQException - { - removeBinding(this); - } - public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException { removeBinding(this); @@ -729,11 +729,6 @@ public abstract class AbstractExchange implements Exchange } - public static interface Task - { - public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; - } - } |