summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java39
1 files changed, 17 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 6a959df440..bc670bd848 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -33,14 +34,17 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -66,7 +70,7 @@ public abstract class AbstractExchange implements Exchange
private VirtualHost _virtualHost;
- private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -138,6 +142,12 @@ public abstract class AbstractExchange implements Exchange
if(_closed.compareAndSet(false,true))
{
+ List<Binding> bindings = new ArrayList<Binding>(_bindings);
+ for(Binding binding : bindings)
+ {
+ removeBinding(binding);
+ }
+
if(_alternateExchange != null)
{
_alternateExchange.removeReference(this);
@@ -145,9 +155,9 @@ public abstract class AbstractExchange implements Exchange
CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
- for(Task task : _closeTaskList)
+ for(Action<Exchange> task : _closeTaskList)
{
- task.onClose(this);
+ task.performAction(this);
}
_closeTaskList.clear();
}
@@ -300,12 +310,12 @@ public abstract class AbstractExchange implements Exchange
return !_referrers.isEmpty();
}
- public void addCloseTask(final Task task)
+ public void addCloseTask(final Action<Exchange> task)
{
_closeTaskList.add(task);
}
- public void removeCloseTask(final Task task)
+ public void removeCloseTask(final Action<Exchange> task)
{
_closeTaskList.remove(task);
}
@@ -421,7 +431,7 @@ public abstract class AbstractExchange implements Exchange
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
@@ -579,8 +589,6 @@ public abstract class AbstractExchange implements Exchange
{
doRemoveBinding(b);
queue.removeBinding(b);
- removeCloseTask(b);
- queue.removeQueueDeleteTask(b);
if (b.isDurable())
{
@@ -659,8 +667,6 @@ public abstract class AbstractExchange implements Exchange
DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
}
- queue.addQueueDeleteTask(b);
- addCloseTask(b);
queue.addBinding(b);
doAddBinding(b);
b.logCreation();
@@ -673,7 +679,7 @@ public abstract class AbstractExchange implements Exchange
}
}
- private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+ private final class BindingImpl extends Binding
{
private final BindingLogSubject _logSubject;
//TODO : persist creation time
@@ -689,12 +695,6 @@ public abstract class AbstractExchange implements Exchange
}
-
- public void doTask(final AMQQueue queue) throws AMQException
- {
- removeBinding(this);
- }
-
public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
{
removeBinding(this);
@@ -729,11 +729,6 @@ public abstract class AbstractExchange implements Exchange
}
- public static interface Task
- {
- public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
- }
-
}