diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java | 93 |
1 files changed, 68 insertions, 25 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 04d5fef462..2a51657f60 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -43,11 +43,15 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.Task; +import org.apache.qpid.server.configuration.updater.TaskWithException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; @@ -96,6 +100,7 @@ import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.transport.TransportException; public abstract class AbstractQueue<X extends AbstractQueue<X>> @@ -642,16 +647,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override - public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - FilterManager filters, + public QueueConsumerImpl addConsumer(final ConsumerTarget target, + final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<ConsumerImpl.Option> optionSet) + final EnumSet<ConsumerImpl.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { + try + { + return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>() + { + + @Override + public QueueConsumerImpl execute() + throws Exception + { + + return addConsumerInternal(target, filters, messageClass, consumerName, optionSet); + } + }); + } + catch (ExistingExclusiveConsumer | ConsumerAccessRefused | + ExistingConsumerPreventsExclusive | RuntimeException e) + { + throw e; + } + catch (Exception e) + { + // Should never happen + throw new ServerScopedRuntimeException(e); + } + + } + + private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target, + FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<ConsumerImpl.Option> optionSet) + throws ExistingExclusiveConsumer, ConsumerAccessRefused, + ExistingConsumerPreventsExclusive + { if (hasExclusiveConsumer()) { throw new ExistingExclusiveConsumer(); @@ -763,7 +803,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, - filters, + filters, messageClass, optionSet); @@ -812,19 +852,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> deliverAsync(); return consumer; - } @Override - protected void beforeClose() + protected ListenableFuture<Void> beforeClose() { _closing = true; - super.beforeClose(); + return super.beforeClose(); } - synchronized void unregisterConsumer(final QueueConsumerImpl consumer) + void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) { @@ -835,7 +874,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (removed) { - consumer.close(); + consumer.closeAsync(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); @@ -1219,10 +1258,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); - if(sub.acquires()) - { - entry.unlockAcquisition(); - } } } } @@ -1798,7 +1833,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> for (BindingImpl b : bindingCopy) { - b.delete(); + // TODO - RG - Need to sort out bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); @@ -1851,7 +1894,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } _deleteTaskList.clear(); - close(); + closeAsync(); deleted(); //Log Queue Deletion getEventLogger().message(_logSubject, QueueMessages.DELETED()); @@ -2050,10 +2093,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); - if(sub.acquires()) - { - node.unlockAcquisition(); - } } } @@ -2221,7 +2260,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (consumerDone) { sub.flushBatched(); - if (lastLoop && getNextAvailableEntry(sub) == null) + boolean noMore = getNextAvailableEntry(sub) == null; + if (lastLoop && noMore) { sub.queueEmpty(); } @@ -2595,7 +2635,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { if (_virtualHost.getState() != State.ACTIVE) { - throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + throw new VirtualHostUnavailableException(this._virtualHost); } if(!message.isReferenced(this)) @@ -2660,7 +2700,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return allowed; } - private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) + private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive { if(desiredPolicy == null) @@ -2862,24 +2902,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> //============= @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _virtualHost.removeQueue(this); preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } |