diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java | 101 |
1 files changed, 79 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index cb026e175b..61bbe6f732 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -36,6 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -63,12 +68,12 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; public abstract class AbstractExchange<T extends AbstractExchange<T>> extends AbstractConfiguredObject<T> @@ -479,7 +484,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> { if (_virtualHost.getState() != State.ACTIVE) { - throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + throw new VirtualHostUnavailableException(this._virtualHost); } List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); @@ -593,9 +598,18 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } @Override - public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments) { - return makeBinding(null, bindingKey, queue, arguments, false); + return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>() + { + @Override + public ListenableFuture<Boolean> call() throws Exception + { + return makeBindingAsync(null, bindingKey, queue, arguments, false); + } + })); + + } @Override @@ -603,12 +617,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> final AMQQueue queue, final Map<String, Object> arguments) { - final BindingImpl existingBinding = getBinding(bindingKey, queue); - return makeBinding(existingBinding == null ? null : existingBinding.getId(), - bindingKey, - queue, - arguments, - true); + return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>() + { + @Override + public ListenableFuture<Boolean> call() throws Exception + { + + final BindingImpl existingBinding = getBinding(bindingKey, queue); + return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(), + bindingKey, + queue, + arguments, + true); + } + })); } @@ -634,7 +656,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> doRemoveBinding(b); queue.removeBinding(b); - b.delete(); + // TODO - RG - Fix bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } } @@ -651,7 +681,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return _bindingsMap.get(new BindingIdentifier(bindingKey,queue)); } - private boolean makeBinding(UUID id, + private ListenableFuture<Boolean> makeBindingAsync(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> arguments, @@ -685,22 +715,45 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> attributes.put(Binding.ID, id); attributes.put(Binding.ARGUMENTS, arguments); - BindingImpl b = new BindingImpl(attributes, queue, this); - b.create(); // Must be called before addBinding as it resolves automated attributes. + final BindingImpl b = new BindingImpl(attributes, queue, this); - addBinding(b); - return true; + final SettableFuture<Boolean> returnVal = SettableFuture.create(); + + Futures.addCallback(b.createAsync(), new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + addBinding(b); + returnVal.set(true); + } + catch(Throwable t) + { + returnVal.setException(t); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes. + + return returnVal; } else if(force) { Map<String,Object> oldArguments = existingMapping.getArguments(); existingMapping.setArguments(arguments); onBindingUpdated(existingMapping, oldArguments); - return true; + return Futures.immediateFuture(true); } else { - return false; + return Futures.immediateFuture(false); } } } @@ -723,22 +776,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { try { @@ -748,8 +803,9 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } catch (ExchangeIsAlternateException e) { - return; + } + return Futures.immediateFuture(null); } @Override @@ -860,4 +916,5 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return binding; } + } |