diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-10 16:53:37 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-10 16:53:37 +0000 |
commit | 641d37f3cfcb05146cbd99dda0b29ca593601762 (patch) | |
tree | 844cb6ac8f00ddcd510f7adae7c2d0c547eeabe1 | |
parent | a86065c9efe2907d7e310a35689fee132083efa8 (diff) | |
download | qpid-python-641d37f3cfcb05146cbd99dda0b29ca593601762.tar.gz |
Refactor use of futures
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665614 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 384 insertions, 251 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index d6dff430ad..4c0bf41cbf 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -163,6 +163,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan * with NO_SYN durability in case if such Node crushes. */ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); + + put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)"); }}); public static final String PERMITTED_NODE_LIST = "permittedNodes"; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 6a4e048e5c..9f4402881b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -429,18 +429,14 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) protected ListenableFuture<Void> doDelete() { - final SettableFuture<Void> returnVal = SettableFuture.create(); // get helpers before close. on close all children are closed and not available anymore final Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); - final ListenableFuture<Void> superFuture = super.doDelete(); - superFuture.addListener(new Runnable() + return doAfter(super.doDelete(),new Runnable() { @Override public void run() { - try - { if (getConfigurationStore() != null) { getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); @@ -458,15 +454,11 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); } } - } - finally - { - returnVal.set(null); - } + } - }, getTaskExecutor().getExecutor()); + }); + - return returnVal; } @Override @@ -706,23 +698,15 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu final VirtualHost<?,?,?> virtualHost = getVirtualHost(); if (virtualHost!= null) { - final SettableFuture<Void> returnVal = SettableFuture.create(); - virtualHost.closeAsync().addListener(new Runnable() + return doAfter(virtualHost.closeAsync(), new Runnable() { @Override public void run() { - try - { childRemoved(virtualHost); - } - finally - { - returnVal.set(null); - } + } - }, getTaskExecutor().getExecutor()); - return returnVal; + }); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 4e7cd4a151..4f8b472058 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -37,13 +38,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ExchangeMessages; @@ -607,15 +609,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> @Override public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments) { - return getTaskExecutor().run(new Task<Boolean>() - { + return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>() + { + @Override + public ListenableFuture<Boolean> call() throws Exception + { + return makeBindingAsync(null, bindingKey, queue, arguments, false); + } + })); - @Override - public Boolean execute() - { - return makeBinding(null, bindingKey, queue, arguments, false); - } - }); } @@ -624,12 +626,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> final AMQQueue queue, final Map<String, Object> arguments) { - final BindingImpl existingBinding = getBinding(bindingKey, queue); - return makeBinding(existingBinding == null ? null : existingBinding.getId(), - bindingKey, - queue, - arguments, - true); + return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>() + { + @Override + public ListenableFuture<Boolean> call() throws Exception + { + + final BindingImpl existingBinding = getBinding(bindingKey, queue); + return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(), + bindingKey, + queue, + arguments, + true); + } + })); } @@ -680,7 +690,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return _bindingsMap.get(new BindingIdentifier(bindingKey,queue)); } - private boolean makeBinding(UUID id, + private ListenableFuture<Boolean> makeBindingAsync(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> arguments, @@ -714,23 +724,45 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> attributes.put(Binding.ID, id); attributes.put(Binding.ARGUMENTS, arguments); - BindingImpl b = new BindingImpl(attributes, queue, this); - // TODO - RG - Fix Bindings - b.createAsync(); // Must be called before addBinding as it resolves automated attributes. + final BindingImpl b = new BindingImpl(attributes, queue, this); - addBinding(b); - return true; + final SettableFuture<Boolean> returnVal = SettableFuture.create(); + + Futures.addCallback(b.createAsync(), new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + addBinding(b); + returnVal.set(true); + } + catch(Throwable t) + { + returnVal.setException(t); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes. + + return returnVal; } else if(force) { Map<String,Object> oldArguments = existingMapping.getArguments(); existingMapping.setArguments(arguments); onBindingUpdated(existingMapping, oldArguments); - return true; + return Futures.immediateFuture(true); } else { - return false; + return Futures.immediateFuture(false); } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 996d3e2043..736a925943 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -41,15 +41,18 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -454,48 +457,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final ListenableFuture<Void> openAsync() { - final SettableFuture<Void> returnVal = SettableFuture.create(); + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + _openFailed = false; + OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); + try + { + doResolution(true, exceptionHandler); + doValidation(true, exceptionHandler); + doOpening(true, exceptionHandler); + return doAttainState(exceptionHandler); + } + catch (RuntimeException e) + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + return Futures.immediateFuture(null); + } + } + else + { + return Futures.immediateFuture(null); + } + + } + }); - _taskExecutor.run(new VoidTask() + } + + protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action) + { + final SettableFuture<T> returnVal = SettableFuture.create(); + + _taskExecutor.submit(new Task<Void>() { @Override - public void execute() + public Void execute() { - if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + try { - _openFailed = false; - OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); - try - { - doResolution(true, exceptionHandler); - doValidation(true, exceptionHandler); - doOpening(true, exceptionHandler); - doAttainState(exceptionHandler).addListener( - new Runnable() - { - @Override - public void run() - { - returnVal.set(null); - } - }, MoreExecutors.sameThreadExecutor() - ); - } - catch (RuntimeException e) + Futures.addCallback(action.call(), new FutureCallback<T>() { - exceptionHandler.handleException(e, AbstractConfiguredObject.this); - returnVal.set(null); - } + @Override + public void onSuccess(final T result) + { + returnVal.set(result); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }); } - else + catch (Exception e) { - returnVal.set(null); + returnVal.setException(e); } + return null; } }); - return returnVal; + return returnVal; } @@ -558,13 +587,25 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { counter.incrementCount(); ListenableFuture<Void> close = child.closeAsync(); - close.addListener(new Runnable() + Futures.addCallback(close, new FutureCallback<Void>() { @Override - public void run() + public void onSuccess(final Void result) { counter.decrementCount(); } + + @Override + public void onFailure(final Throwable t) + { + LOGGER.error("Exception occurred while closing " + + child.getClass().getSimpleName() + + " : '" + + child.getName() + + "'", t); + // No need to decrement counter as setting the exception will complete the future + returnVal.setException(t); + } }, MoreExecutors.sameThreadExecutor()); } }); @@ -598,70 +639,48 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public final ListenableFuture<Void> closeAsync() { - LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName()); - if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - final SettableFuture<Void> returnVal = SettableFuture.create(); - - final ListenableFuture<Void> beforeClose = beforeClose(); - - if(beforeClose != null) + @Override + public ListenableFuture<Void> call() throws Exception { - beforeClose.addListener(new Runnable() + LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName()); + + if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { - @Override - public void run() + + return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>() { - final ListenableFuture<Void> childCloseFuture = closeChildren(); - childCloseFuture.addListener(new Runnable() + @Override + public ListenableFuture<Void> call() throws Exception { - @Override - public void run() + return closeChildren(); + } + }).then(new Runnable() { - onClose(); - unregister(false); - LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); - returnVal.set(null); - } - }, getTaskExecutor().getExecutor()); - } - }, getTaskExecutor().getExecutor()); - } - else - { - final ListenableFuture<Void> childCloseFuture = closeChildren(); - childCloseFuture.addListener(new Runnable() + @Override + public void run() + { + onClose(); + unregister(false); + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + } + }); + } + else { - @Override - public void run() - { + LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName()); - onClose(); - unregister(false); - LOGGER.debug("Closed " - + AbstractConfiguredObject.this.getClass().getSimpleName() - + " : " - + getName()); - returnVal.set(null); - } - }, getTaskExecutor().getExecutor()); + return Futures.immediateFuture(null); + } } + }); - return returnVal; - - - } - else - { - LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName()); - - return Futures.immediateFuture(null); - } } protected ListenableFuture<Void> beforeClose() { - return null; + return Futures.immediateFuture(null); } protected void onClose() @@ -675,15 +694,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final ListenableFuture<Void> createAsync() { - final SettableFuture<Void> returnVal = SettableFuture.create(); - - _taskExecutor.run(new VoidTask() + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - @Override - public void execute() + public ListenableFuture<Void> call() throws Exception { - if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) { final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); @@ -716,42 +731,23 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true); + try { doCreation(true, unregisteringExceptionHandler); doOpening(true, unregisteringExceptionHandler); - Futures.addCallback(doAttainState(unregisteringExceptionHandler), - new FutureCallback<Void>() - { - @Override - public void onSuccess(final Void result) - { - returnVal.set(null); - } - - @Override - public void onFailure(final Throwable t) - { - if (t instanceof RuntimeException) - { - unregisteringExceptionHandler.handleException((RuntimeException) t, - AbstractConfiguredObject.this); - } - returnVal.set(null); - } - }, - getTaskExecutor().getExecutor()); + return doAttainState(unregisteringExceptionHandler); } catch (RuntimeException e) { unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this); - returnVal.set(null); } } + return Futures.immediateFuture(null); + } }); - return returnVal; } protected void validateOnCreate() @@ -822,8 +818,15 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } catch(RuntimeException e) { - exceptionHandler.handleException(e, AbstractConfiguredObject.this); - returnVal.set(null); + try + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + returnVal.set(null); + } + catch(Throwable t) + { + returnVal.setException(t); + } } } }); @@ -851,11 +854,18 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public void onFailure(final Throwable t) { - if(t instanceof RuntimeException) + try { - exceptionHandler.handleException((RuntimeException) t, configuredObject); + if (t instanceof RuntimeException) + { + exceptionHandler.handleException((RuntimeException) t, + configuredObject); + } + } + finally + { + counter.decrementCount(); } - counter.decrementCount(); } },getTaskExecutor().getExecutor()); @@ -1257,90 +1267,68 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private ListenableFuture<Void> setDesiredState(final State desiredState) throws IllegalStateTransitionException, AccessControlException { - - final SettableFuture<Void> returnVal = SettableFuture.create(); - runTask(new Task<Void>() + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + final State state = getState(); + final State currentDesiredState = getDesiredState(); + if(desiredState == currentDesiredState && desiredState != state) + { + return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable() + { + @Override + public void run() { - @Override - public Void execute() + final State currentState = getState(); + if (currentState != state) { + notifyStateChanged(state, currentState); + } - final State state = getState(); - final State currentDesiredState = getDesiredState(); - if(desiredState == currentDesiredState && desiredState != state) - { - attainStateIfOpenedOrReopenFailed().addListener(new Runnable() - { - @Override - public void run() - { - try - { - final State currentState = getState(); - if (currentState != state) - { - notifyStateChanged(state, currentState); - } - } - finally - { - returnVal.set(null); - } - } - } - ,_taskExecutor.getExecutor()); - } - else - { - try - { - authoriseSetDesiredState(desiredState); - validateChange(createProxyForValidation(Collections.<String, Object>singletonMap( - ConfiguredObject.DESIRED_STATE, - desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE)); + } - if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState)) - { - attributeSet(ConfiguredObject.DESIRED_STATE, - currentDesiredState, - desiredState); + }); - attainStateIfOpenedOrReopenFailed().addListener(new Runnable() - { - @Override - public void run() - { - try - { - if (getState() == desiredState) - { - notifyStateChanged(state, desiredState); - } - } - finally - { - returnVal.set(null); - } - } - }, _taskExecutor.getExecutor()); - } - else - { - returnVal.set(null); - } - } - catch (RuntimeException | Error e) - { - returnVal.set(null); - throw e; - } + } + else + { + authoriseSetDesiredState(desiredState); + validateChange(createProxyForValidation(Collections.<String, Object>singletonMap( + ConfiguredObject.DESIRED_STATE, + desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE)); + + if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState)) + { + attributeSet(ConfiguredObject.DESIRED_STATE, + currentDesiredState, + desiredState); + + return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable() + { + @Override + public void run() + { + if (getState() == desiredState) + { + notifyStateChanged(state, desiredState); + } + + } + } + ); + } + else + { + return Futures.immediateFuture(null); + } + } + + } + }); - } - return null; - } - }); - return returnVal; } @Override @@ -1727,11 +1715,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im doSync(deleteAsync()); } - private void doSync(ListenableFuture<Void> async) + protected final <R> R doSync(ListenableFuture<R> async) { try { - async.get(); + return async.get(); } catch (InterruptedException e) { @@ -1762,12 +1750,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final ListenableFuture<Void> deleteAsync() { - /* if(getState() == State.UNINITIALIZED) - { - _desiredState = State.DELETED; - } - */ return setDesiredState(State.DELETED); - + return setDesiredState(State.DELETED); } public final void start() @@ -1870,6 +1853,128 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im doSync(setAttributesAsync(attributes)); } + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + second.run(); + returnVal.set(null); + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } + + public static interface ChainedListenableFuture extends ListenableFuture<Void> + { + ChainedListenableFuture then(Runnable r); + ChainedListenableFuture then(Callable<ListenableFuture<Void>> r); + } + + public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture + { + private final Executor _exector; + + public ChainedSettableFuture(final Executor executor) + { + _exector = executor; + } + + @Override + public boolean set(Void value) + { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) + { + return super.setException(throwable); + } + + @Override + public ChainedListenableFuture then(final Runnable r) + { + return doAfter(_exector, this, r); + } + + @Override + public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r) + { + return doAfter(_exector, this,r); + } + } + + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + final ListenableFuture<Void> future = second.call(); + Futures.addCallback(future, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + returnVal.set(null); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } + @Override public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index a76f79f8b0..9e8f0d1ca2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -21,16 +21,16 @@ package org.apache.qpid.test.utils; import java.security.PrivilegedAction; -import java.util.Map; import java.util.Set; +import javax.security.auth.Subject; + import org.apache.log4j.Logger; import org.apache.qpid.server.Broker; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.security.SecurityManager; - -import javax.security.auth.Subject; +import org.apache.qpid.server.util.Action; public class InternalBrokerHolder implements BrokerHolder { @@ -50,7 +50,14 @@ public class InternalBrokerHolder implements BrokerHolder { LOGGER.info("Starting internal broker (same JVM)"); - _broker = new Broker(); + _broker = new Broker(new Action<Integer>() + { + @Override + public void performAction(final Integer object) + { + _broker = null; + } + }); _broker.startup(options); } @@ -63,7 +70,10 @@ public class InternalBrokerHolder implements BrokerHolder @Override public Object run() { - _broker.shutdown(); + if(_broker != null) + { + _broker.shutdown(); + } return null; } |