summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java74
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java10
3 files changed, 72 insertions, 14 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 0ba48387dd..664c544de4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -43,12 +43,15 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.TaskWithException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -650,16 +653,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
- public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- FilterManager filters,
+ public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<ConsumerImpl.Option> optionSet)
+ final EnumSet<ConsumerImpl.Option> optionSet)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
+ try
+ {
+ return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>()
+ {
+ @Override
+ public QueueConsumerImpl execute()
+ throws Exception
+ {
+
+ return addConsumerInternal(target, filters, messageClass, consumerName, optionSet);
+ }
+ });
+ }
+ catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
+ ExistingConsumerPreventsExclusive | RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ // Should never happen
+ throw new ServerScopedRuntimeException(e);
+ }
+
+
+ }
+
+ private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+ FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<ConsumerImpl.Option> optionSet)
+ throws ExistingExclusiveConsumer, ConsumerAccessRefused,
+ ExistingConsumerPreventsExclusive
+ {
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
@@ -771,7 +809,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,
- filters,
+ filters,
messageClass,
optionSet);
@@ -820,7 +858,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
deliverAsync();
return consumer;
-
}
@Override
@@ -832,7 +869,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
- synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
+ void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
{
@@ -843,7 +880,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (removed)
{
- consumer.close();
+ consumer.closeAsync();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
@@ -1802,7 +1839,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
for (BindingImpl b : bindingCopy)
{
- b.delete();
+ // TODO - RG - Need to sort out bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
@@ -1855,7 +1900,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_deleteTaskList.clear();
- close();
+ closeAsync();
deleted();
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -2661,7 +2706,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return allowed;
}
- private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
+ private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
throws ExistingConsumerPreventsExclusive
{
if(desiredPolicy == null)
@@ -2863,24 +2908,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
//=============
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_virtualHost.removeQueue(this);
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 4ffb868537..a5225f3aa4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -191,7 +191,7 @@ class QueueConsumerImpl
if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
{
- close();
+ closeAsync();
}
final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
if(stateListener != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
index 19265ef453..b9ff6505fc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Port;
@@ -49,6 +51,14 @@ public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFa
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)