summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java93
1 files changed, 68 insertions, 25 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 04d5fef462..2a51657f60 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -43,11 +43,15 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.TaskWithException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -96,6 +100,7 @@ import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -642,16 +647,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
- public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- FilterManager filters,
+ public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<ConsumerImpl.Option> optionSet)
+ final EnumSet<ConsumerImpl.Option> optionSet)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
+ try
+ {
+ return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>()
+ {
+
+ @Override
+ public QueueConsumerImpl execute()
+ throws Exception
+ {
+
+ return addConsumerInternal(target, filters, messageClass, consumerName, optionSet);
+ }
+ });
+ }
+ catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
+ ExistingConsumerPreventsExclusive | RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ // Should never happen
+ throw new ServerScopedRuntimeException(e);
+ }
+
+ }
+
+ private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+ FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<ConsumerImpl.Option> optionSet)
+ throws ExistingExclusiveConsumer, ConsumerAccessRefused,
+ ExistingConsumerPreventsExclusive
+ {
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
@@ -763,7 +803,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,
- filters,
+ filters,
messageClass,
optionSet);
@@ -812,19 +852,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
deliverAsync();
return consumer;
-
}
@Override
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
_closing = true;
- super.beforeClose();
+ return super.beforeClose();
}
- synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
+ void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
{
@@ -835,7 +874,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (removed)
{
- consumer.close();
+ consumer.closeAsync();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
@@ -1219,10 +1258,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
- if(sub.acquires())
- {
- entry.unlockAcquisition();
- }
}
}
}
@@ -1798,7 +1833,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
for (BindingImpl b : bindingCopy)
{
- b.delete();
+ // TODO - RG - Need to sort out bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
@@ -1851,7 +1894,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_deleteTaskList.clear();
- close();
+ closeAsync();
deleted();
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -2050,10 +2093,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
- if(sub.acquires())
- {
- node.unlockAcquisition();
- }
}
}
@@ -2221,7 +2260,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (consumerDone)
{
sub.flushBatched();
- if (lastLoop && getNextAvailableEntry(sub) == null)
+ boolean noMore = getNextAvailableEntry(sub) == null;
+ if (lastLoop && noMore)
{
sub.queueEmpty();
}
@@ -2595,7 +2635,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
if (_virtualHost.getState() != State.ACTIVE)
{
- throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ throw new VirtualHostUnavailableException(this._virtualHost);
}
if(!message.isReferenced(this))
@@ -2660,7 +2700,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return allowed;
}
- private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
+ private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
throws ExistingConsumerPreventsExclusive
{
if(desiredPolicy == null)
@@ -2862,24 +2902,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
//=============
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_virtualHost.removeQueue(this);
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}