summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-10 16:53:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-10 16:53:37 +0000
commit641d37f3cfcb05146cbd99dda0b29ca593601762 (patch)
tree844cb6ac8f00ddcd510f7adae7c2d0c547eeabe1
parenta86065c9efe2907d7e310a35689fee132083efa8 (diff)
downloadqpid-python-641d37f3cfcb05146cbd99dda0b29ca593601762.tar.gz
Refactor use of futures
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665614 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java78
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java505
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java20
5 files changed, 384 insertions, 251 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index d6dff430ad..4c0bf41cbf 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -163,6 +163,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
* with NO_SYN durability in case if such Node crushes.
*/
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+
+ put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)");
}});
public static final String PERMITTED_NODE_LIST = "permittedNodes";
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 6a4e048e5c..9f4402881b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -429,18 +429,14 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
protected ListenableFuture<Void> doDelete()
{
- final SettableFuture<Void> returnVal = SettableFuture.create();
// get helpers before close. on close all children are closed and not available anymore
final Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
- final ListenableFuture<Void> superFuture = super.doDelete();
- superFuture.addListener(new Runnable()
+ return doAfter(super.doDelete(),new Runnable()
{
@Override
public void run()
{
- try
- {
if (getConfigurationStore() != null)
{
getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
@@ -458,15 +454,11 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
+ ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
}
}
- }
- finally
- {
- returnVal.set(null);
- }
+
}
- }, getTaskExecutor().getExecutor());
+ });
+
- return returnVal;
}
@Override
@@ -706,23 +698,15 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
final VirtualHost<?,?,?> virtualHost = getVirtualHost();
if (virtualHost!= null)
{
- final SettableFuture<Void> returnVal = SettableFuture.create();
- virtualHost.closeAsync().addListener(new Runnable()
+ return doAfter(virtualHost.closeAsync(), new Runnable()
{
@Override
public void run()
{
- try
- {
childRemoved(virtualHost);
- }
- finally
- {
- returnVal.set(null);
- }
+
}
- }, getTaskExecutor().getExecutor());
- return returnVal;
+ });
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 4e7cd4a151..4f8b472058 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,13 +38,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -607,15 +609,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
@Override
public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments)
{
- return getTaskExecutor().run(new Task<Boolean>()
- {
+ return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+ {
+ @Override
+ public ListenableFuture<Boolean> call() throws Exception
+ {
+ return makeBindingAsync(null, bindingKey, queue, arguments, false);
+ }
+ }));
- @Override
- public Boolean execute()
- {
- return makeBinding(null, bindingKey, queue, arguments, false);
- }
- });
}
@@ -624,12 +626,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
final AMQQueue queue,
final Map<String, Object> arguments)
{
- final BindingImpl existingBinding = getBinding(bindingKey, queue);
- return makeBinding(existingBinding == null ? null : existingBinding.getId(),
- bindingKey,
- queue,
- arguments,
- true);
+ return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+ {
+ @Override
+ public ListenableFuture<Boolean> call() throws Exception
+ {
+
+ final BindingImpl existingBinding = getBinding(bindingKey, queue);
+ return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(),
+ bindingKey,
+ queue,
+ arguments,
+ true);
+ }
+ }));
}
@@ -680,7 +690,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return _bindingsMap.get(new BindingIdentifier(bindingKey,queue));
}
- private boolean makeBinding(UUID id,
+ private ListenableFuture<Boolean> makeBindingAsync(UUID id,
String bindingKey,
AMQQueue queue,
Map<String, Object> arguments,
@@ -714,23 +724,45 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
attributes.put(Binding.ID, id);
attributes.put(Binding.ARGUMENTS, arguments);
- BindingImpl b = new BindingImpl(attributes, queue, this);
- // TODO - RG - Fix Bindings
- b.createAsync(); // Must be called before addBinding as it resolves automated attributes.
+ final BindingImpl b = new BindingImpl(attributes, queue, this);
- addBinding(b);
- return true;
+ final SettableFuture<Boolean> returnVal = SettableFuture.create();
+
+ Futures.addCallback(b.createAsync(), new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ addBinding(b);
+ returnVal.set(true);
+ }
+ catch(Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes.
+
+ return returnVal;
}
else if(force)
{
Map<String,Object> oldArguments = existingMapping.getArguments();
existingMapping.setArguments(arguments);
onBindingUpdated(existingMapping, oldArguments);
- return true;
+ return Futures.immediateFuture(true);
}
else
{
- return false;
+ return Futures.immediateFuture(false);
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 996d3e2043..736a925943 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -41,15 +41,18 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -454,48 +457,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final ListenableFuture<Void> openAsync()
{
- final SettableFuture<Void> returnVal = SettableFuture.create();
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ _openFailed = false;
+ OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+ try
+ {
+ doResolution(true, exceptionHandler);
+ doValidation(true, exceptionHandler);
+ doOpening(true, exceptionHandler);
+ return doAttainState(exceptionHandler);
+ }
+ catch (RuntimeException e)
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ return Futures.immediateFuture(null);
+ }
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+
+ }
+ });
- _taskExecutor.run(new VoidTask()
+ }
+
+ protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action)
+ {
+ final SettableFuture<T> returnVal = SettableFuture.create();
+
+ _taskExecutor.submit(new Task<Void>()
{
@Override
- public void execute()
+ public Void execute()
{
- if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ try
{
- _openFailed = false;
- OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
- try
- {
- doResolution(true, exceptionHandler);
- doValidation(true, exceptionHandler);
- doOpening(true, exceptionHandler);
- doAttainState(exceptionHandler).addListener(
- new Runnable()
- {
- @Override
- public void run()
- {
- returnVal.set(null);
- }
- }, MoreExecutors.sameThreadExecutor()
- );
- }
- catch (RuntimeException e)
+ Futures.addCallback(action.call(), new FutureCallback<T>()
{
- exceptionHandler.handleException(e, AbstractConfiguredObject.this);
- returnVal.set(null);
- }
+ @Override
+ public void onSuccess(final T result)
+ {
+ returnVal.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ });
}
- else
+ catch (Exception e)
{
- returnVal.set(null);
+ returnVal.setException(e);
}
+ return null;
}
});
- return returnVal;
+ return returnVal;
}
@@ -558,13 +587,25 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
counter.incrementCount();
ListenableFuture<Void> close = child.closeAsync();
- close.addListener(new Runnable()
+ Futures.addCallback(close, new FutureCallback<Void>()
{
@Override
- public void run()
+ public void onSuccess(final Void result)
{
counter.decrementCount();
}
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ LOGGER.error("Exception occurred while closing "
+ + child.getClass().getSimpleName()
+ + " : '"
+ + child.getName()
+ + "'", t);
+ // No need to decrement counter as setting the exception will complete the future
+ returnVal.setException(t);
+ }
}, MoreExecutors.sameThreadExecutor());
}
});
@@ -598,70 +639,48 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public final ListenableFuture<Void> closeAsync()
{
- LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName());
- if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- final SettableFuture<Void> returnVal = SettableFuture.create();
-
- final ListenableFuture<Void> beforeClose = beforeClose();
-
- if(beforeClose != null)
+ @Override
+ public ListenableFuture<Void> call() throws Exception
{
- beforeClose.addListener(new Runnable()
+ LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName());
+
+ if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
- @Override
- public void run()
+
+ return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
{
- final ListenableFuture<Void> childCloseFuture = closeChildren();
- childCloseFuture.addListener(new Runnable()
+ @Override
+ public ListenableFuture<Void> call() throws Exception
{
- @Override
- public void run()
+ return closeChildren();
+ }
+ }).then(new Runnable()
{
- onClose();
- unregister(false);
- LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
- returnVal.set(null);
- }
- }, getTaskExecutor().getExecutor());
- }
- }, getTaskExecutor().getExecutor());
- }
- else
- {
- final ListenableFuture<Void> childCloseFuture = closeChildren();
- childCloseFuture.addListener(new Runnable()
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+ }
+ });
+ }
+ else
{
- @Override
- public void run()
- {
+ LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName());
- onClose();
- unregister(false);
- LOGGER.debug("Closed "
- + AbstractConfiguredObject.this.getClass().getSimpleName()
- + " : "
- + getName());
- returnVal.set(null);
- }
- }, getTaskExecutor().getExecutor());
+ return Futures.immediateFuture(null);
+ }
}
+ });
- return returnVal;
-
-
- }
- else
- {
- LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName());
-
- return Futures.immediateFuture(null);
- }
}
protected ListenableFuture<Void> beforeClose()
{
- return null;
+ return Futures.immediateFuture(null);
}
protected void onClose()
@@ -675,15 +694,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final ListenableFuture<Void> createAsync()
{
- final SettableFuture<Void> returnVal = SettableFuture.create();
-
- _taskExecutor.run(new VoidTask()
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
-
@Override
- public void execute()
+ public ListenableFuture<Void> call() throws Exception
{
-
if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
{
final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
@@ -716,42 +731,23 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
new CreateExceptionHandler(true);
+
try
{
doCreation(true, unregisteringExceptionHandler);
doOpening(true, unregisteringExceptionHandler);
- Futures.addCallback(doAttainState(unregisteringExceptionHandler),
- new FutureCallback<Void>()
- {
- @Override
- public void onSuccess(final Void result)
- {
- returnVal.set(null);
- }
-
- @Override
- public void onFailure(final Throwable t)
- {
- if (t instanceof RuntimeException)
- {
- unregisteringExceptionHandler.handleException((RuntimeException) t,
- AbstractConfiguredObject.this);
- }
- returnVal.set(null);
- }
- },
- getTaskExecutor().getExecutor());
+ return doAttainState(unregisteringExceptionHandler);
}
catch (RuntimeException e)
{
unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
- returnVal.set(null);
}
}
+ return Futures.immediateFuture(null);
+
}
});
- return returnVal;
}
protected void validateOnCreate()
@@ -822,8 +818,15 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
catch(RuntimeException e)
{
- exceptionHandler.handleException(e, AbstractConfiguredObject.this);
- returnVal.set(null);
+ try
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ catch(Throwable t)
+ {
+ returnVal.setException(t);
+ }
}
}
});
@@ -851,11 +854,18 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public void onFailure(final Throwable t)
{
- if(t instanceof RuntimeException)
+ try
{
- exceptionHandler.handleException((RuntimeException) t, configuredObject);
+ if (t instanceof RuntimeException)
+ {
+ exceptionHandler.handleException((RuntimeException) t,
+ configuredObject);
+ }
+ }
+ finally
+ {
+ counter.decrementCount();
}
- counter.decrementCount();
}
},getTaskExecutor().getExecutor());
@@ -1257,90 +1267,68 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private ListenableFuture<Void> setDesiredState(final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
-
- final SettableFuture<Void> returnVal = SettableFuture.create();
- runTask(new Task<Void>()
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ final State state = getState();
+ final State currentDesiredState = getDesiredState();
+ if(desiredState == currentDesiredState && desiredState != state)
+ {
+ return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable()
+ {
+ @Override
+ public void run()
{
- @Override
- public Void execute()
+ final State currentState = getState();
+ if (currentState != state)
{
+ notifyStateChanged(state, currentState);
+ }
- final State state = getState();
- final State currentDesiredState = getDesiredState();
- if(desiredState == currentDesiredState && desiredState != state)
- {
- attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- final State currentState = getState();
- if (currentState != state)
- {
- notifyStateChanged(state, currentState);
- }
- }
- finally
- {
- returnVal.set(null);
- }
- }
- }
- ,_taskExecutor.getExecutor());
- }
- else
- {
- try
- {
- authoriseSetDesiredState(desiredState);
- validateChange(createProxyForValidation(Collections.<String, Object>singletonMap(
- ConfiguredObject.DESIRED_STATE,
- desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE));
+ }
- if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
- {
- attributeSet(ConfiguredObject.DESIRED_STATE,
- currentDesiredState,
- desiredState);
+ });
- attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- if (getState() == desiredState)
- {
- notifyStateChanged(state, desiredState);
- }
- }
- finally
- {
- returnVal.set(null);
- }
- }
- }, _taskExecutor.getExecutor());
- }
- else
- {
- returnVal.set(null);
- }
- }
- catch (RuntimeException | Error e)
- {
- returnVal.set(null);
- throw e;
- }
+ }
+ else
+ {
+ authoriseSetDesiredState(desiredState);
+ validateChange(createProxyForValidation(Collections.<String, Object>singletonMap(
+ ConfiguredObject.DESIRED_STATE,
+ desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE));
+
+ if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+ {
+ attributeSet(ConfiguredObject.DESIRED_STATE,
+ currentDesiredState,
+ desiredState);
+
+ return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (getState() == desiredState)
+ {
+ notifyStateChanged(state, desiredState);
+ }
+
+ }
+ }
+ );
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ }
+ });
- }
- return null;
- }
- });
- return returnVal;
}
@Override
@@ -1727,11 +1715,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
doSync(deleteAsync());
}
- private void doSync(ListenableFuture<Void> async)
+ protected final <R> R doSync(ListenableFuture<R> async)
{
try
{
- async.get();
+ return async.get();
}
catch (InterruptedException e)
{
@@ -1762,12 +1750,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final ListenableFuture<Void> deleteAsync()
{
- /* if(getState() == State.UNINITIALIZED)
- {
- _desiredState = State.DELETED;
- }
- */ return setDesiredState(State.DELETED);
-
+ return setDesiredState(State.DELETED);
}
public final void start()
@@ -1870,6 +1853,128 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
doSync(setAttributesAsync(attributes));
}
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ second.run();
+ returnVal.set(null);
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
+
+ public static interface ChainedListenableFuture extends ListenableFuture<Void>
+ {
+ ChainedListenableFuture then(Runnable r);
+ ChainedListenableFuture then(Callable<ListenableFuture<Void>> r);
+ }
+
+ public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture
+ {
+ private final Executor _exector;
+
+ public ChainedSettableFuture(final Executor executor)
+ {
+ _exector = executor;
+ }
+
+ @Override
+ public boolean set(Void value)
+ {
+ return super.set(value);
+ }
+
+ @Override
+ public boolean setException(Throwable throwable)
+ {
+ return super.setException(throwable);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Runnable r)
+ {
+ return doAfter(_exector, this, r);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r)
+ {
+ return doAfter(_exector, this,r);
+ }
+ }
+
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ final ListenableFuture<Void> future = second.call();
+ Futures.addCallback(future, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ returnVal.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
+
@Override
public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
index a76f79f8b0..9e8f0d1ca2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
@@ -21,16 +21,16 @@
package org.apache.qpid.test.utils;
import java.security.PrivilegedAction;
-import java.util.Map;
import java.util.Set;
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.security.SecurityManager;
-
-import javax.security.auth.Subject;
+import org.apache.qpid.server.util.Action;
public class InternalBrokerHolder implements BrokerHolder
{
@@ -50,7 +50,14 @@ public class InternalBrokerHolder implements BrokerHolder
{
LOGGER.info("Starting internal broker (same JVM)");
- _broker = new Broker();
+ _broker = new Broker(new Action<Integer>()
+ {
+ @Override
+ public void performAction(final Integer object)
+ {
+ _broker = null;
+ }
+ });
_broker.startup(options);
}
@@ -63,7 +70,10 @@ public class InternalBrokerHolder implements BrokerHolder
@Override
public Object run()
{
- _broker.shutdown();
+ if(_broker != null)
+ {
+ _broker.shutdown();
+ }
return null;
}