summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java101
1 files changed, 79 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index cb026e175b..61bbe6f732 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -36,6 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -63,12 +68,12 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public abstract class AbstractExchange<T extends AbstractExchange<T>>
extends AbstractConfiguredObject<T>
@@ -479,7 +484,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
{
if (_virtualHost.getState() != State.ACTIVE)
{
- throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ throw new VirtualHostUnavailableException(this._virtualHost);
}
List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
@@ -593,9 +598,18 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
- public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments)
{
- return makeBinding(null, bindingKey, queue, arguments, false);
+ return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+ {
+ @Override
+ public ListenableFuture<Boolean> call() throws Exception
+ {
+ return makeBindingAsync(null, bindingKey, queue, arguments, false);
+ }
+ }));
+
+
}
@Override
@@ -603,12 +617,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
final AMQQueue queue,
final Map<String, Object> arguments)
{
- final BindingImpl existingBinding = getBinding(bindingKey, queue);
- return makeBinding(existingBinding == null ? null : existingBinding.getId(),
- bindingKey,
- queue,
- arguments,
- true);
+ return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+ {
+ @Override
+ public ListenableFuture<Boolean> call() throws Exception
+ {
+
+ final BindingImpl existingBinding = getBinding(bindingKey, queue);
+ return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(),
+ bindingKey,
+ queue,
+ arguments,
+ true);
+ }
+ }));
}
@@ -634,7 +656,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
doRemoveBinding(b);
queue.removeBinding(b);
- b.delete();
+ // TODO - RG - Fix bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
}
@@ -651,7 +681,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return _bindingsMap.get(new BindingIdentifier(bindingKey,queue));
}
- private boolean makeBinding(UUID id,
+ private ListenableFuture<Boolean> makeBindingAsync(UUID id,
String bindingKey,
AMQQueue queue,
Map<String, Object> arguments,
@@ -685,22 +715,45 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
attributes.put(Binding.ID, id);
attributes.put(Binding.ARGUMENTS, arguments);
- BindingImpl b = new BindingImpl(attributes, queue, this);
- b.create(); // Must be called before addBinding as it resolves automated attributes.
+ final BindingImpl b = new BindingImpl(attributes, queue, this);
- addBinding(b);
- return true;
+ final SettableFuture<Boolean> returnVal = SettableFuture.create();
+
+ Futures.addCallback(b.createAsync(), new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ addBinding(b);
+ returnVal.set(true);
+ }
+ catch(Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes.
+
+ return returnVal;
}
else if(force)
{
Map<String,Object> oldArguments = existingMapping.getArguments();
existingMapping.setArguments(arguments);
onBindingUpdated(existingMapping, oldArguments);
- return true;
+ return Futures.immediateFuture(true);
}
else
{
- return false;
+ return Futures.immediateFuture(false);
}
}
}
@@ -723,22 +776,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
try
{
@@ -748,8 +803,9 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
catch (ExchangeIsAlternateException e)
{
- return;
+
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -860,4 +916,5 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return binding;
}
+
}