diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue')
3 files changed, 72 insertions, 14 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 0ba48387dd..664c544de4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -43,12 +43,15 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.Task; +import org.apache.qpid.server.configuration.updater.TaskWithException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; @@ -650,16 +653,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override - public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - FilterManager filters, + public QueueConsumerImpl addConsumer(final ConsumerTarget target, + final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<ConsumerImpl.Option> optionSet) + final EnumSet<ConsumerImpl.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { + try + { + return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>() + { + @Override + public QueueConsumerImpl execute() + throws Exception + { + + return addConsumerInternal(target, filters, messageClass, consumerName, optionSet); + } + }); + } + catch (ExistingExclusiveConsumer | ConsumerAccessRefused | + ExistingConsumerPreventsExclusive | RuntimeException e) + { + throw e; + } + catch (Exception e) + { + // Should never happen + throw new ServerScopedRuntimeException(e); + } + + + } + + private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target, + FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<ConsumerImpl.Option> optionSet) + throws ExistingExclusiveConsumer, ConsumerAccessRefused, + ExistingConsumerPreventsExclusive + { if (hasExclusiveConsumer()) { throw new ExistingExclusiveConsumer(); @@ -771,7 +809,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, - filters, + filters, messageClass, optionSet); @@ -820,7 +858,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> deliverAsync(); return consumer; - } @Override @@ -832,7 +869,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> - synchronized void unregisterConsumer(final QueueConsumerImpl consumer) + void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) { @@ -843,7 +880,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (removed) { - consumer.close(); + consumer.closeAsync(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); @@ -1802,7 +1839,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> for (BindingImpl b : bindingCopy) { - b.delete(); + // TODO - RG - Need to sort out bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); @@ -1855,7 +1900,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } _deleteTaskList.clear(); - close(); + closeAsync(); deleted(); //Log Queue Deletion getEventLogger().message(_logSubject, QueueMessages.DELETED()); @@ -2661,7 +2706,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return allowed; } - private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) + private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive { if(desiredPolicy == null) @@ -2863,24 +2908,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> //============= @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _virtualHost.removeQueue(this); preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 4ffb868537..a5225f3aa4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -191,7 +191,7 @@ class QueueConsumerImpl if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) { - close(); + closeAsync(); } final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener(); if(stateListener != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java index 19265ef453..b9ff6505fc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Port; @@ -49,6 +51,14 @@ public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFa } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) |