diff options
author | Keith Wall <kwall@apache.org> | 2015-03-09 17:12:14 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-03-09 17:12:14 +0000 |
commit | 98faeab2840203c8e4eb4526afe0fd20a596aa28 (patch) | |
tree | 665f6493dcca389d39b0a5496ad4a0eaab160ef8 | |
parent | 10b21b20fbd892d19ae64084165ec8942f864eac (diff) | |
download | qpid-python-98faeab2840203c8e4eb4526afe0fd20a596aa28.tar.gz |
Add sync/async varients to most ACO methods
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665306 13f79535-47bb-0310-9956-ffa450edef68
59 files changed, 1494 insertions, 406 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index dfbdce4399..926e9a956f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -24,11 +24,12 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; import java.security.AccessControlException; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.sleepycat.je.rep.MasterStateException; - import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.HighAvailabilityMessages; @@ -150,7 +151,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB } @StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { String nodeName = getName(); @@ -170,6 +171,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB { throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e); } + + return Futures.immediateFuture(null); } protected void afterSetRole() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 2000897e87..6a4e048e5c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -42,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LogWriteException; import com.sleepycat.je.rep.NodeState; @@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress)); shutdownOnIntruder(nodeAddress); + throw new IllegalStateException("Intruder node detected: " + nodeAddress); } } @@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer()); environmentFacade.setPermittedNodes(_permittedNodes); } + + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - try - { - super.doStop(); - } - finally + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> superFuture = super.doStop(); + Futures.addCallback(superFuture, new FutureCallback<Void>() { - closeEnvironment(); + @Override + public void onSuccess(final Void result) + { + doFinally(); + } - // closing the environment does not cause a state change. Adjust the role - // so that our observers will see DETACHED rather than our previous role in the group. - _lastRole.set(NodeRole.DETACHED); - attributeSet(ROLE, _role, NodeRole.DETACHED); - } + @Override + public void onFailure(final Throwable t) + { + doFinally(); + } + + private void doFinally() + { + try + { + closeEnvironment(); + + // closing the environment does not cause a state change. Adjust the role + // so that our observers will see DETACHED rather than our previous role in the group. + _lastRole.set(NodeRole.DETACHED); + attributeSet(ROLE, _role, NodeRole.DETACHED); + } + finally + { + returnVal.set(null); + } + + } + }); + return returnVal; } private void closeEnvironment() @@ -397,43 +427,60 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { - // get helpers before close. on close all children are closed and not available anymore - Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); - super.doDelete(); - - if (getConfigurationStore() != null) - { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); - } + final SettableFuture<Void> returnVal = SettableFuture.create(); - if (getState() == State.DELETED && !helpers.isEmpty()) + // get helpers before close. on close all children are closed and not available anymore + final Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); + final ListenableFuture<Void> superFuture = super.doDelete(); + superFuture.addListener(new Runnable() { - try + @Override + public void run() { - new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); - } - catch(DatabaseException e) - { - LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() - + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + try + { + if (getConfigurationStore() != null) + { + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); + } + + if (getState() == State.DELETED && !helpers.isEmpty()) + { + try + { + new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); + } + catch(DatabaseException e) + { + LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() + + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + } + } + } + finally + { + returnVal.set(null); + } } - } + }, getTaskExecutor().getExecutor()); + + return returnVal; } @Override - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade(); if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster() && replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1) { - super.deleteVirtualHostIfExists(); + return super.deleteVirtualHostIfExists(); } else { - closeVirtualHostIfExist(); + return closeVirtualHostIfExist(); } } @@ -553,7 +600,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); getConfigurationStore().upgradeStoreStructure(); @@ -640,7 +687,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); Map<String, Object> hostAttributes = new HashMap<>(); hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); @@ -654,13 +701,32 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - protected void closeVirtualHostIfExist() + protected ListenableFuture<Void> closeVirtualHostIfExist() { - VirtualHost<?,?,?> virtualHost = getVirtualHost(); + final VirtualHost<?,?,?> virtualHost = getVirtualHost(); if (virtualHost!= null) { - virtualHost.close(); - childRemoved(virtualHost); + final SettableFuture<Void> returnVal = SettableFuture.create(); + virtualHost.closeAsync().addListener(new Runnable() + { + @Override + public void run() + { + try + { + childRemoved(virtualHost); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + return returnVal; + } + else + { + return Futures.immediateFuture(null); } } @@ -687,15 +753,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu onReplica(); break; case DETACHED: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; case UNKNOWN: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; default: LOGGER.error("Unexpected state change: " + state); } } + catch (InterruptedException | ExecutionException e) + { + throw new ServerScopedRuntimeException(e); + } finally { NodeRole newRole = NodeRole.fromJeState(state); @@ -1137,7 +1207,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { - close(); + closeAsync(); } finally { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index 8c389e6d22..bc5d30a0f0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -110,8 +110,8 @@ public class Broker implements BrokerShutdownProvider { if(_systemConfig != null) { - ListenableFuture<Void> closeResult = _systemConfig.close(); - closeResult.get(5000l, TimeUnit.MILLISECONDS); + ListenableFuture<Void> closeResult = _systemConfig.closeAsync(); + closeResult.get(30000l, TimeUnit.MILLISECONDS); } _taskExecutor.stop(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 6012e2e8db..0463bb64a3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -28,6 +28,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.exchange.AbstractExchange; @@ -195,7 +198,7 @@ public class BindingImpl } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { if(_deleted.compareAndSet(false,true)) { @@ -208,12 +211,14 @@ public class BindingImpl deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java index fecb4de7f5..0f59494850 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java @@ -49,6 +49,7 @@ public class TaskExecutorImpl implements TaskExecutor private volatile Thread _taskThread; private final AtomicBoolean _running = new AtomicBoolean(); private volatile ExecutorService _executor; + private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor(); @Override @@ -68,7 +69,7 @@ public class TaskExecutorImpl implements TaskExecutor @Override public Thread newThread(Runnable r) { - _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME); + _taskThread = new TaskThread(r, TASK_EXECUTION_THREAD_NAME, TaskExecutorImpl.this); return _taskThread; } }); @@ -281,7 +282,7 @@ public class TaskExecutorImpl implements TaskExecutor @Override public Executor getExecutor() { - return _executor; + return _wrappedExecutor; } public boolean isTaskExecutorThread() @@ -380,4 +381,41 @@ public class TaskExecutorImpl implements TaskExecutor return get(); } } + + private class ImmediateIfSameThreadExecutor implements Executor + { + + @Override + public void execute(final Runnable command) + { + if(isTaskExecutorThread() + || (_executor == null && (Thread.currentThread() instanceof TaskThread + && ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this))) + { + command.run(); + } + else + { + _executor.execute(command); + } + + } + } + + private static class TaskThread extends Thread + { + + private final TaskExecutorImpl _taskExecutor; + + public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor) + { + super(r, name); + _taskExecutor = taskExecutor; + } + + public TaskExecutorImpl getTaskExecutor() + { + return _taskExecutor; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index 83784d4b25..e17eca8614 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -67,7 +67,7 @@ public interface ConsumerImpl boolean seesRequeues(); - ListenableFuture<Void> close(); + void close(); boolean trySendLock(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index cf23e3dd91..4e7cd4a151 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -37,10 +37,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ExchangeMessages; @@ -602,9 +605,18 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } @Override - public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments) { - return makeBinding(null, bindingKey, queue, arguments, false); + return getTaskExecutor().run(new Task<Boolean>() + { + + @Override + public Boolean execute() + { + return makeBinding(null, bindingKey, queue, arguments, false); + } + }); + } @Override @@ -643,7 +655,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> doRemoveBinding(b); queue.removeBinding(b); - b.delete(); + // TODO - RG - Fix bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } } @@ -695,7 +715,8 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> attributes.put(Binding.ARGUMENTS, arguments); BindingImpl b = new BindingImpl(attributes, queue, this); - b.create(); // Must be called before addBinding as it resolves automated attributes. + // TODO - RG - Fix Bindings + b.createAsync(); // Must be called before addBinding as it resolves automated attributes. addBinding(b); return true; @@ -732,22 +753,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { try { @@ -757,8 +780,9 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } catch (ExchangeIsAlternateException | RequiredExchangeException e) { - return; + } + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java index 3e377ebaa6..be98665df8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java @@ -107,8 +107,4 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex void bindingRemoved(ExchangeImpl exchange, BindingImpl binding); } - public void addBindingListener(BindingListener listener); - - public void removeBindingListener(BindingListener listener); - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 2269999e1d..57eb16c0be 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -43,12 +43,14 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -169,7 +171,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this); - @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" ) + @ManagedAttributeField private State _desiredState; private boolean _openComplete; private boolean _openFailed; @@ -446,24 +448,58 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public final void open() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(openAsync()); + } + + + public final ListenableFuture<Void> openAsync() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + + _taskExecutor.run(new VoidTask() { - _openFailed = false; - OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); - try - { - doResolution(true, exceptionHandler); - doValidation(true, exceptionHandler); - doOpening(true, exceptionHandler); - doAttainState(exceptionHandler); - } - catch(RuntimeException e) + + @Override + public void execute() { - exceptionHandler.handleException(e, this); + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + _openFailed = false; + OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); + try + { + doResolution(true, exceptionHandler); + doValidation(true, exceptionHandler); + doOpening(true, exceptionHandler); + doAttainState(exceptionHandler).addListener( + new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, MoreExecutors.sameThreadExecutor() + ); + } + catch (RuntimeException e) + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + returnVal.set(null); + } + } + else + { + returnVal.set(null); + } } - } + }); + return returnVal; + } + + public void registerWithParents() { for(ConfiguredObject<?> parent : _parents.values()) @@ -475,7 +511,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - private static class ChildCounter + private class ChildCounter { private final AtomicInteger _count = new AtomicInteger(); private final Runnable _task; @@ -501,8 +537,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im protected final ListenableFuture<Void> closeChildren() { - LOGGER.debug("KWDEBUG closing children"); - final SettableFuture<Void> returnVal = SettableFuture.create(); final ChildCounter counter = new ChildCounter(new Runnable() { @@ -510,6 +544,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public void run() { returnVal.set(null); + LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() ); + } }); counter.incrementCount(); @@ -521,7 +557,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public void performAction(final ConfiguredObject<?> child) { counter.incrementCount(); - ListenableFuture<Void> close = child.close(); + ListenableFuture<Void> close = child.closeAsync(); close.addListener(new Runnable() { @Override @@ -554,8 +590,15 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } @Override - public final ListenableFuture<Void> close() + public void close() + { + doSync(closeAsync()); + } + + @Override + public final ListenableFuture<Void> closeAsync() { + LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName()); if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { final SettableFuture<Void> returnVal = SettableFuture.create(); @@ -577,6 +620,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { onClose(); unregister(false); + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); returnVal.set(null); } }, getTaskExecutor().getExecutor()); @@ -591,8 +635,13 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public void run() { + onClose(); unregister(false); + LOGGER.debug("Closed " + + AbstractConfiguredObject.this.getClass().getSimpleName() + + " : " + + getName()); returnVal.set(null); } }, getTaskExecutor().getExecutor()); @@ -604,6 +653,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } else { + LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName()); + return Futures.immediateFuture(null); } } @@ -619,48 +670,88 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void create() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(createAsync()); + } + + public final ListenableFuture<Void> createAsync() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + + _taskExecutor.run(new VoidTask() { - final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); - if(currentUser != null) - { - String currentUserName = currentUser.getName(); - _attributes.put(LAST_UPDATED_BY, currentUserName); - _attributes.put(CREATED_BY, currentUserName); - _lastUpdatedBy = currentUserName; - _createdBy = currentUserName; - } - final long currentTime = System.currentTimeMillis(); - _attributes.put(LAST_UPDATED_TIME, currentTime); - _attributes.put(CREATED_TIME, currentTime); - _lastUpdatedTime = currentTime; - _createdTime = currentTime; - CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); - try - { - doResolution(true, createExceptionHandler); - doValidation(true, createExceptionHandler); - validateOnCreate(); - registerWithParents(); - } - catch(RuntimeException e) + @Override + public void execute() { - createExceptionHandler.handleException(e, this); - } - AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true); - try - { - doCreation(true, unregisteringExceptionHandler); - doOpening(true, unregisteringExceptionHandler); - doAttainState(unregisteringExceptionHandler); - } - catch(RuntimeException e) - { - unregisteringExceptionHandler.handleException(e, this); + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); + if (currentUser != null) + { + String currentUserName = currentUser.getName(); + _attributes.put(LAST_UPDATED_BY, currentUserName); + _attributes.put(CREATED_BY, currentUserName); + _lastUpdatedBy = currentUserName; + _createdBy = currentUserName; + } + final long currentTime = System.currentTimeMillis(); + _attributes.put(LAST_UPDATED_TIME, currentTime); + _attributes.put(CREATED_TIME, currentTime); + _lastUpdatedTime = currentTime; + _createdTime = currentTime; + + CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); + try + { + doResolution(true, createExceptionHandler); + doValidation(true, createExceptionHandler); + validateOnCreate(); + registerWithParents(); + } + catch (RuntimeException e) + { + createExceptionHandler.handleException(e, AbstractConfiguredObject.this); + } + + final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = + new CreateExceptionHandler(true); + try + { + doCreation(true, unregisteringExceptionHandler); + doOpening(true, unregisteringExceptionHandler); + Futures.addCallback(doAttainState(unregisteringExceptionHandler), + new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + returnVal.set(null); + } + + @Override + public void onFailure(final Throwable t) + { + if (t instanceof RuntimeException) + { + unregisteringExceptionHandler.handleException((RuntimeException) t, + AbstractConfiguredObject.this); + } + returnVal.set(null); + } + }, + getTaskExecutor().getExecutor()); + } + catch (RuntimeException e) + { + unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this); + returnVal.set(null); + } + } } - } + }); + + return returnVal; } protected void validateOnCreate() @@ -710,8 +801,33 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { } - private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) + private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + try + { + attainState().addListener(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + } + catch(RuntimeException e) + { + returnVal.set(null); + throw e; + } + } + }); + counter.incrementCount(); applyToChildren(new Action<ConfiguredObject<?>>() { @Override @@ -719,22 +835,36 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (child instanceof AbstractConfiguredObject) { - AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; + final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; if (configuredObject._dynamicState.get() == DynamicState.OPENED) { - try - { - configuredObject.doAttainState(exceptionHandler); - } - catch (RuntimeException e) - { - exceptionHandler.handleException(e, configuredObject); - } + counter.incrementCount(); + Futures.addCallback(configuredObject.doAttainState(exceptionHandler), + new FutureCallback() + { + @Override + public void onSuccess(final Object result) + { + counter.decrementCount(); + } + + @Override + public void onFailure(final Throwable t) + { + if(t instanceof RuntimeException) + { + exceptionHandler.handleException((RuntimeException) t, configuredObject); + } + counter.decrementCount(); + } + },getTaskExecutor().getExecutor()); + } } } }); - attainState(); + counter.decrementCount(); + return returnVal; } protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler) @@ -990,16 +1120,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - private void attainStateIfOpenedOrReopenFailed() + private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed() { if (_openComplete || getDesiredState() == State.DELETED) { - attainState(); + return attainState(); } else if (_openFailed) { - open(); + return openAsync(); } + return Futures.immediateFuture(null); } protected void onOpen() @@ -1007,10 +1138,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - protected void attainState() + protected ListenableFuture<Void> attainState() { State currentState = getState(); State desiredState = getDesiredState(); + ListenableFuture<Void> returnVal; if(currentState != desiredState) { Method stateChangingMethod = getStateChangeMethod(currentState, desiredState); @@ -1018,7 +1150,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { try { - stateChangingMethod.invoke(this); + returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this); } catch (IllegalAccessException e) { @@ -1038,7 +1170,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying); } } + else + { + returnVal = Futures.immediateFuture(null); + } } + else + { + returnVal = Futures.immediateFuture(null); + } + return returnVal; } private Method getStateChangeMethod(final State currentState, final State desiredState) @@ -1113,46 +1254,93 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - private State setDesiredState(final State desiredState) + private ListenableFuture<Void> setDesiredState(final State desiredState) throws IllegalStateTransitionException, AccessControlException { - return runTask(new Task<State>() + final SettableFuture<Void> returnVal = SettableFuture.create(); + runTask(new Task<Void>() { @Override - public State execute() + public Void execute() { - State state = getState(); - if(desiredState == getDesiredState() && desiredState != state) + final State state = getState(); + final State currentDesiredState = getDesiredState(); + if(desiredState == currentDesiredState && desiredState != state) { - attainStateIfOpenedOrReopenFailed(); - final State currentState = getState(); - if (currentState != state) + attainStateIfOpenedOrReopenFailed().addListener(new Runnable() { - notifyStateChanged(state, currentState); + @Override + public void run() + { + try + { + final State currentState = getState(); + if (currentState != state) + { + notifyStateChanged(state, currentState); + } + } + finally + { + returnVal.set(null); + } + } } - return currentState; + ,_taskExecutor.getExecutor()); } else { - authoriseSetDesiredState(desiredState); + try + { + authoriseSetDesiredState(desiredState); + validateChange(createProxyForValidation(Collections.<String, Object>singletonMap( + ConfiguredObject.DESIRED_STATE, + desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE)); - setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE, - desiredState)); + if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState)) + { + attributeSet(ConfiguredObject.DESIRED_STATE, + currentDesiredState, + desiredState); - if (getState() == desiredState) - { - notifyStateChanged(state, desiredState); - return desiredState; + attainStateIfOpenedOrReopenFailed().addListener(new Runnable() + { + @Override + public void run() + { + try + { + if (getState() == desiredState) + { + notifyStateChanged(state, desiredState); + } + } + finally + { + returnVal.set(null); + } + + } + }, _taskExecutor.getExecutor()); + } + else + { + returnVal.set(null); + } } - else + catch (RuntimeException | Error e) { - return getState(); + returnVal.set(null); + throw e; } + } + return null; } }); + return returnVal; } @Override @@ -1531,20 +1719,67 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void stop() { - setDesiredState(State.STOPPED); + doSync(setDesiredState(State.STOPPED)); } public final void delete() { - if(getState() == State.UNINITIALIZED) + doSync(deleteAsync()); + } + + private void doSync(ListenableFuture<Void> async) + { + try + { + async.get(); + } + catch (InterruptedException e) + { + throw new ServerScopedRuntimeException(e); + } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + if(cause instanceof RuntimeException) + { + throw (RuntimeException) cause; + } + else if(cause instanceof Error) + { + throw (Error) cause; + } + else if(cause != null) + { + throw new ServerScopedRuntimeException(cause); + } + else + { + throw new ServerScopedRuntimeException(e); + } + + } + } + + public final ListenableFuture<Void> deleteAsync() + { + /* if(getState() == State.UNINITIALIZED) { _desiredState = State.DELETED; } - setDesiredState(State.DELETED); + */ return setDesiredState(State.DELETED); } - public final void start() { setDesiredState(State.ACTIVE); } + public final void start() + { + doSync(startAsync()); + } + + public ListenableFuture<Void> startAsync() + { + return setDesiredState(State.ACTIVE); + } + protected void deleted() { @@ -1629,19 +1864,49 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im _taskExecutor.run(task); } + @Override + public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + { + doSync(setAttributesAsync(attributes)); + } @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { + final Map<String,Object> updateAttributes = new HashMap<>(attributes); + Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE); runTask(new VoidTask() { @Override public void execute() { authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet()); - changeAttributes(attributes); + validateChange(createProxyForValidation(attributes), attributes.keySet()); + + changeAttributes(updateAttributes); } }); + if(desiredState != null) + { + State state; + if(desiredState instanceof State) + { + state = (State)desiredState; + } + else if(desiredState instanceof String) + { + state = State.valueOf((String)desiredState); + } + else + { + throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State"); + } + return setDesiredState(state); + } + else + { + return Futures.immediateFuture(null); + } } protected void authoriseSetAttributes(final ConfiguredObject<?> proxyForValidation, @@ -1652,7 +1917,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im protected void changeAttributes(final Map<String, Object> attributes) { - validateChange(createProxyForValidation(attributes), attributes.keySet()); Collection<String> names = getAttributeNames(); for (String name : names) { @@ -2193,7 +2457,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (source.getState() != State.DELETED) { - source.delete(); + // TODO - RG - This isn't right :-( + source.deleteAsync(); } } finally diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java index 5bf5e337ad..f97d2dfe14 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java @@ -23,6 +23,10 @@ package org.apache.qpid.server.model; import java.util.HashMap; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.ConfiguredObjectDependency; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -59,6 +63,26 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf return instance; } + + @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + final SettableFuture<X> returnVal = SettableFuture.create(); + final X instance = createInstance(attributes, parents); + final ListenableFuture<Void> createFuture = instance.createAsync(); + createFuture.addListener(new Runnable() + { + @Override + public void run() + { + returnVal.set(instance); + } + }, MoreExecutors.sameThreadExecutor()); + return returnVal; + } + protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents); public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java index b421c5aaf1..c6ac7d4073 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java @@ -31,6 +31,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler; @@ -194,11 +197,11 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - protected void activate() + protected ListenableFuture<Void> activate() { final EventLogger eventLogger = _eventLogger; - EventLogger startupLogger; + final EventLogger startupLogger; if (isStartupLoggedToSystemOut()) { //Create the composite (logging+SystemOut MessageLogger to be used during startup @@ -232,17 +235,34 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this); upgrader.perform(); - Broker broker = getBroker(); + final Broker broker = getBroker(); broker.setEventLogger(startupLogger); - broker.open(); - - if (broker.getState() == State.ACTIVE) - { - startupLogger.message(BrokerMessages.READY()); - broker.setEventLogger(eventLogger); - } - + final SettableFuture<Void> returnVal = SettableFuture.create(); + broker.openAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + + if (broker.getState() == State.ACTIVE) + { + startupLogger.message(BrokerMessages.READY()); + broker.setEventLogger(eventLogger); + } + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + + return returnVal; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java index 944ed97ccc..c56698c60c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java @@ -45,5 +45,4 @@ public interface Binding<X extends Binding<X>> extends ConfiguredObject<X> @ManagedStatistic long getMatches(); - void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 395cb52fcd..d2ab317f0e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -238,6 +238,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> ConfiguredObject... otherParents); void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; + ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; + Class<? extends ConfiguredObject> getCategoryClass(); Class<? extends ConfiguredObject> getTypeClass(); @@ -250,8 +252,12 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> ConfiguredObjectRecord asObjectRecord(); void open(); + ListenableFuture<Void> openAsync(); + + void close(); + ListenableFuture<Void> closeAsync(); - ListenableFuture<Void> close(); + ListenableFuture<Void> deleteAsync(); TaskExecutor getTaskExecutor(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java index 7d4023862b..ed7c841344 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedConfiguredObject; @@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory <X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents); + <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents); + <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java index 5026df0e19..82da0fd206 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory return factory.create(this, attributes, parents); } + + @Override + public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes); + + return factory.createAsync(this, attributes, parents); + } + + @Override public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass, Map<String, Object> attributes) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java index d0c6fb041e..a93e6a602f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java @@ -40,6 +40,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.plugin.ConfiguredObjectRegistration; @@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistry { if(m.isAnnotationPresent(StateTransition.class)) { - if(m.getParameterTypes().length == 0) + if(ListenableFuture.class.isAssignableFrom(m.getReturnType())) { - m.setAccessible(true); - StateTransition annotation = m.getAnnotation(StateTransition.class); + if (m.getParameterTypes().length == 0) + { + m.setAccessible(true); + StateTransition annotation = m.getAnnotation(StateTransition.class); + + for (State state : annotation.currentState()) + { + addStateTransition(state, annotation.desiredState(), m, map); + } - for(State state : annotation.currentState()) + } + else { - addStateTransition(state, annotation.desiredState(), m, map); + throw new ServerScopedRuntimeException( + "A state transition method must have no arguments. Method " + + m.getName() + + " on " + + clazz.getName() + + " does not meet this criteria."); } - } else { - throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria."); + throw new ServerScopedRuntimeException( + "A state transition method must return a ListenableFuture. Method " + + m.getName() + + " on " + + clazz.getName() + + " does not meet this criteria."); } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java index b28441438d..1c245363a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java @@ -103,7 +103,6 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X> //children Collection<Session> getSessions(); - void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java index 7318a58640..999a3594b4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; + @ManagedObject public interface Port<X extends Port<X>> extends ConfiguredObject<X> { @@ -76,4 +78,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X> void start(); + ListenableFuture<Void> startAsync(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index cc758ba7c9..c2338c08d8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -147,8 +147,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, void stop(); - void delete(); - String getRedirectHost(AmqpPort<?> port); public static interface Transaction diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 28eea21093..dfbe8b12ef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -35,6 +35,9 @@ import java.util.regex.Pattern; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.common.QpidProperties; @@ -235,13 +238,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if(_parent.isManagementMode()) { - _managementModeAuthenticationProvider.open(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + + _managementModeAuthenticationProvider.openAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + activateWithoutManagementMode(); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } + else + { + activateWithoutManagementMode(); + return Futures.immediateFuture(null); + } + } + private void activateWithoutManagementMode() + { boolean hasBrokerAnyErroredChildren = false; for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass())) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index a4dbd7d5e5..8bcbba9ac4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -160,11 +160,28 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { - asyncClose(); - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + asyncClose().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index 327b7ddfe9..67533f8244 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -32,6 +32,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; @@ -147,7 +150,8 @@ public class FileBasedGroupProviderImpl GroupAdapter groupAdapter = new GroupAdapter(attrMap); principals.add(groupAdapter); groupAdapter.registerWithParents(); - groupAdapter.open(); + // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread + groupAdapter.openAsync(); } } @@ -265,7 +269,7 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_groupDatabase != null) { @@ -282,29 +286,48 @@ public class FileBasedGroupProviderImpl throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath())); } } + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - File file = new File(getPath()); - if (file.exists()) - { - if (!file.delete()) - { - throw new IllegalConfigurationException("Cannot delete group file"); - } - } - - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + File file = new File(getPath()); + if (file.exists()) + { + if (!file.delete()) + { + throw new IllegalConfigurationException("Cannot delete group file"); + } + } + + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } public Set<Principal> getGroupPrincipalsForUser(String username) @@ -377,9 +400,10 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override @@ -396,7 +420,8 @@ public class FileBasedGroupProviderImpl attrMap.put(GroupMember.NAME, principal.getName()); GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap); groupMemberAdapter.registerWithParents(); - groupMemberAdapter.open(); + // todo - this will be safe, but the synchronous open should not be called from the management thread + groupMemberAdapter.openAsync(); members.add(groupMemberAdapter); } _groupPrincipal = new GroupPrincipal(getName()); @@ -459,12 +484,13 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName()); _groupDatabase.removeGroup(getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override @@ -522,19 +548,21 @@ public class FileBasedGroupProviderImpl } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName()); _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java index 7046f2973e..c95b3ab804 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java @@ -37,16 +37,17 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.util.BaseAction; -import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; import org.codehaus.jackson.type.TypeReference; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; @@ -55,6 +56,8 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; public class FileSystemPreferencesProviderImpl @@ -128,7 +131,7 @@ public class FileSystemPreferencesProviderImpl } @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_store != null) { @@ -138,6 +141,7 @@ public class FileSystemPreferencesProviderImpl { throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() ); } + return Futures.immediateFuture(null); } @Override @@ -171,33 +175,52 @@ public class FileSystemPreferencesProviderImpl } @StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED) - private void doQuiesce() + private ListenableFuture<Void> doQuiesce() { if(_store != null) { _store.close(); } setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + if(_store != null) + { + _store.close(); + _store.delete(); + deleted(); + _authenticationProvider.setPreferencesProvider(null); + + } + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - if(_store != null) - { - _store.close(); - _store.delete(); - deleted(); - _authenticationProvider.setPreferencesProvider(null); + return returnVal; - } - setState(State.DELETED); } @StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE ) - private void restart() + private ListenableFuture<Void> restart() { if (_store == null) { @@ -206,6 +229,7 @@ public class FileSystemPreferencesProviderImpl _store.open(); setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 7c9b439e93..cb412e8d41 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -26,6 +26,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -169,10 +172,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java index 21827ffe58..5c53eed509 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java @@ -28,6 +28,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -228,14 +231,24 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener(new Runnable() + { + @Override + public void run() + { + setState(State.DELETED); + returnVal.set(null); + + } + }, getTaskExecutor().getExecutor()); + return returnVal; } @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { try { @@ -246,12 +259,14 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo setState(State.ERRORED); throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e); } + return Futures.immediateFuture(null); } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java index 870621f292..5c3000db4a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model.port; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -108,6 +110,14 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java index 0d16b4ffc7..cd0187034e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java @@ -20,19 +20,23 @@ */ package org.apache.qpid.server.plugin; +import java.util.Map; + +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedConfiguredObject; -import java.util.Map; - public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> extends Pluggable { Class<? super X> getCategoryClass(); X create(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents); + ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents); + UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, ConfiguredObjectRecord record, ConfiguredObject<?>... parents); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 0ba48387dd..664c544de4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -43,12 +43,15 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.Task; +import org.apache.qpid.server.configuration.updater.TaskWithException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; @@ -650,16 +653,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override - public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - FilterManager filters, + public QueueConsumerImpl addConsumer(final ConsumerTarget target, + final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<ConsumerImpl.Option> optionSet) + final EnumSet<ConsumerImpl.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { + try + { + return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>() + { + @Override + public QueueConsumerImpl execute() + throws Exception + { + + return addConsumerInternal(target, filters, messageClass, consumerName, optionSet); + } + }); + } + catch (ExistingExclusiveConsumer | ConsumerAccessRefused | + ExistingConsumerPreventsExclusive | RuntimeException e) + { + throw e; + } + catch (Exception e) + { + // Should never happen + throw new ServerScopedRuntimeException(e); + } + + + } + + private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target, + FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<ConsumerImpl.Option> optionSet) + throws ExistingExclusiveConsumer, ConsumerAccessRefused, + ExistingConsumerPreventsExclusive + { if (hasExclusiveConsumer()) { throw new ExistingExclusiveConsumer(); @@ -771,7 +809,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, - filters, + filters, messageClass, optionSet); @@ -820,7 +858,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> deliverAsync(); return consumer; - } @Override @@ -832,7 +869,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> - synchronized void unregisterConsumer(final QueueConsumerImpl consumer) + void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) { @@ -843,7 +880,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (removed) { - consumer.close(); + consumer.closeAsync(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); @@ -1802,7 +1839,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> for (BindingImpl b : bindingCopy) { - b.delete(); + // TODO - RG - Need to sort out bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); @@ -1855,7 +1900,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } _deleteTaskList.clear(); - close(); + closeAsync(); deleted(); //Log Queue Deletion getEventLogger().message(_logSubject, QueueMessages.DELETED()); @@ -2661,7 +2706,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return allowed; } - private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) + private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive { if(desiredPolicy == null) @@ -2863,24 +2908,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> //============= @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _virtualHost.removeQueue(this); preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 4ffb868537..a5225f3aa4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -191,7 +191,7 @@ class QueueConsumerImpl if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) { - close(); + closeAsync(); } final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener(); if(stateListener != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java index 19265ef453..b9ff6505fc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Port; @@ -49,6 +51,14 @@ public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFa } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java index aa5f55dfb4..5f585f3d88 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java @@ -38,6 +38,9 @@ import java.util.Set; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -98,7 +101,7 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl> } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -113,12 +116,14 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl> } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java index fb161fef4e..df1cbd0493 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java @@ -38,6 +38,9 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; @@ -98,7 +101,7 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -139,12 +142,14 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java index f6298ab383..c2779415d1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java @@ -57,6 +57,8 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.xml.bind.DatatypeConverter; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -184,7 +186,7 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -202,12 +204,14 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java index 993d689fb6..397e226699 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java @@ -45,6 +45,8 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.security.auth.x500.X500Principal; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -172,7 +174,7 @@ public class NonJavaTrustStoreImpl } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -215,12 +217,14 @@ public class NonJavaTrustStoreImpl } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java index 7f98468726..9befcebe5b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java @@ -28,6 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -175,13 +178,14 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED ) - protected void startQuiesced() + protected ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { try { @@ -199,11 +203,11 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica throw e; } } - + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { String providerName = getName(); @@ -219,15 +223,50 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica } } - close(); - if (_preferencesProvider != null) - { - _preferencesProvider.delete(); - } - deleted(); + final SettableFuture<Void> returnVal = SettableFuture.create(); - setState(State.DELETED); + final ListenableFuture<Void> future = closeAsync(); + future.addListener(new Runnable() + { + @Override + public void run() + { + if (_preferencesProvider != null) + { + _preferencesProvider.deleteAsync().addListener(new Runnable() + { + @Override + public void run() + { + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + } + else + { + try + { + deleted(); + + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + } + }, getTaskExecutor().getExecutor()); + return returnVal; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java index 78b2b60fe9..7046fc4885 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java @@ -98,22 +98,15 @@ public abstract class ConfigModelPasswordManagingAuthenticationProvider<X extend @Override public void deleteUser(final String user) throws AccountNotFoundException { - runTask(new VoidTaskWithException<AccountNotFoundException>() + final ManagedUser authUser = getUser(user); + if(authUser != null) { - @Override - public void execute() throws AccountNotFoundException - { - final ManagedUser authUser = getUser(user); - if(authUser != null) - { - authUser.delete(); - } - else - { - throw new AccountNotFoundException("No such user: '" + user + "'"); - } - } - }); + authUser.delete(); + } + else + { + throw new AccountNotFoundException("No such user: '" + user + "'"); + } } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java index c8884e15a8..6f36ec7d11 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java @@ -27,6 +27,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; @@ -96,10 +99,11 @@ class ManagedUser extends AbstractConfiguredObject<ManagedUser> implements User< } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _authenticationManager.getUserMap().remove(getName()); deleted(); + return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index cf165ff4af..a4dbcdc284 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -40,6 +40,9 @@ import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; @@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal super.onOpen(); _principalDatabase = createDatabase(); initialise(); - List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); - for (Principal user : users) - { - PrincipalAdapter principalAdapter = new PrincipalAdapter(user); - principalAdapter.registerWithParents(); - principalAdapter.open(); - _userMap.put(user, principalAdapter); - } } + protected abstract PrincipalDatabase createDatabase(); @@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal return _principalDatabase; } + @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) + public ListenableFuture<Void> activate() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); + _userMap.clear(); + if(!users.isEmpty()) + { + for (final Principal user : users) + { + final PrincipalAdapter principalAdapter = new PrincipalAdapter(user); + principalAdapter.registerWithParents(); + principalAdapter.openAsync().addListener(new Runnable() + { + @Override + public void run() + { + _userMap.put(user, principalAdapter); + if (_userMap.size() == users.size()) + { + setState(State.ACTIVE); + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + + } + + return returnVal; + } + else + { + return Futures.immediateFuture(null); + } + } - @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - public void doDelete() + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED) + public ListenableFuture<Void> doDelete() { File file = new File(_path); if (file.exists() && file.isFile()) @@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override @@ -465,13 +497,14 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal } @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { try { @@ -489,7 +522,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal { LOGGER.warn("Failed to delete user " + _user, e); } - + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java index 98607d2490..96d32f4179 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.security.group; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Group; @@ -77,16 +80,18 @@ public class GroupImpl extends AbstractConfiguredObject<GroupImpl> implements Gr @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java index ea17db6ce7..a86d380b2b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java @@ -23,6 +23,9 @@ package org.apache.qpid.server.security.group; import java.security.Principal; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Group; import org.apache.qpid.server.model.GroupMember; @@ -61,15 +64,17 @@ public class GroupMemberImpl extends AbstractConfiguredObject<GroupMemberImpl> i @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java index ecc166f8fc..7dc032cc90 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java @@ -26,6 +26,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -89,16 +92,18 @@ public class GroupProviderImpl extends AbstractConfiguredObject<GroupProviderImp } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 4329f000ec..4a2d30ac9f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -38,11 +38,15 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -388,27 +392,65 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return isStoreEmptyHandler.isEmpty(); } - protected void createDefaultExchanges() + protected ListenableFuture<Void> createDefaultExchanges() { - Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>() + return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>() { + private static final int TOTAL_STANDARD_EXCHANGES = 4; + private final AtomicInteger _createdExchangeCount = new AtomicInteger(); + private SettableFuture<Void> _future = SettableFuture.create(); + @Override - public Void run() + public ListenableFuture<Void> run() { addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - return null; + return _future; + } + + private void standardExchangeCreated() + { + if(_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES) + { + _future.set(null); + } } - void addStandardExchange(String name, String type) + ListenableFuture<Void> addStandardExchange(String name, String type) { + Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.NAME, name); attributes.put(Exchange.TYPE, type); attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName())); - childAdded(addExchange(attributes)); + final ListenableFuture<ExchangeImpl> future = addExchangeAsync(attributes); + final SettableFuture<Void> returnVal = SettableFuture.create(); + Futures.addCallback(future, new FutureCallback<ExchangeImpl>() + { + @Override + public void onSuccess(final ExchangeImpl result) + { + try + { + childAdded(result); + } + finally + { + standardExchangeCreated(); + } + + } + + @Override + public void onFailure(final Throwable t) + { + standardExchangeCreated(); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } }); } @@ -777,6 +819,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } + private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes) + throws ExchangeExistsException, ReservedExchangeNameException, + NoFactoryForTypeException + { + try + { + ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this); + return result; + } + catch (DuplicateNameException e) + { + throw new ExchangeExistsException(getExchange(e.getName())); + } + + } + + @Override public void removeExchange(ExchangeImpl exchange, boolean force) throws ExchangeIsAlternateException, RequiredExchangeException @@ -809,7 +868,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override protected ListenableFuture<Void> beforeClose() { - _logger.debug("KWDEBUG setting state to UNAVAILABLE"); setState(State.UNAVAILABLE); return super.beforeClose(); @@ -818,7 +876,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override protected void onClose() { - _logger.debug("KWDEBUG onClose"); //Stop Connections _connectionRegistry.close(); _dtxRegistry.close(); @@ -830,7 +887,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private void closeMessageStore() { - _logger.debug("KWDEBUG closeMessageStore"); if (getMessageStore() != null) { try @@ -1312,38 +1368,76 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - // TODO - need to deal with async close children - closeChildren(); - shutdownHouseKeeping(); - closeMessageStore(); - setState(State.STOPPED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeChildren().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + shutdownHouseKeeping(); + closeMessageStore(); + setState(State.STOPPED); + + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { if(_deleted.compareAndSet(false,true)) { + final SettableFuture<Void> returnVal = SettableFuture.create(); String hostName = getName(); - close(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + MessageStore ms = getMessageStore(); + if (ms != null) + { + try + { + ms.onDelete(AbstractVirtualHost.this); + } + catch (Exception e) + { + _logger.warn("Exception occurred on message store deletion", e); + } + } + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - MessageStore ms = getMessageStore(); - if (ms != null) - { - try - { - ms.onDelete(this); - } - catch (Exception e) - { - _logger.warn("Exception occurred on message store deletion", e); - } - } - deleted(); - setState(State.DELETED); + return returnVal; + } + else + { + return Futures.immediateFuture(null); } } @@ -1532,7 +1626,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE ) - private void onActivate() + private ListenableFuture<Void> onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory()); @@ -1552,9 +1646,28 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if (isStoreEmpty()) { - createDefaultExchanges(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + createDefaultExchanges().addListener(new Runnable() + { + @Override + public void run() + { + postCreateDefaultExchangeTasks(); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + return returnVal; } + else + { + postCreateDefaultExchangeTasks(); + return Futures.immediateFuture(null); + } + } + + private void postCreateDefaultExchangeTasks() + { if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); @@ -1589,9 +1702,32 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker); } } + private static class ChildCounter + { + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - private void onRestart() + private ListenableFuture<Void> onRestart() { resetStatistics(); @@ -1622,6 +1758,25 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte new GenericRecoverer(this).recover(records); + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + onActivate().addListener( + new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor() + ); + } + }); + counter.incrementCount(); Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() { @Override @@ -1632,14 +1787,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public void performAction(final ConfiguredObject<?> object) { - object.open(); + counter.incrementCount(); + object.openAsync().addListener(new Runnable() + { + @Override + public void run() + { + counter.decrementCount(); + } + }, getTaskExecutor().getExecutor()); } }); return null; } }); - - onActivate(); + counter.decrementCount(); + return returnVal; } private class FileSystemSpaceChecker extends HouseKeepingTask diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index 03c30a9cd4..fd73963b68 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -29,6 +29,8 @@ import java.util.Map; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -68,7 +70,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -107,15 +109,21 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard if (host != null) { final VirtualHost<?,?,?> recoveredHost = host; - Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - recoveredHost.open(); - return null; - } - }); + final ListenableFuture<Void> openFuture = Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> run() + { + return recoveredHost.openAsync(); + + } + }); + return openFuture; + } + else + { + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index ba915e3427..bccf284b34 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -38,7 +38,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -122,16 +125,47 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { + final SettableFuture<Void> returnVal = SettableFuture.create(); + try { - activate(); - setState(State.ACTIVE); + Futures.addCallback(activate(), + new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + setState(State.ACTIVE); + } + finally + { + returnVal.set(null); + } + + } + + @Override + public void onFailure(final Throwable t) + { + + setState(State.ERRORED); + returnVal.set(null); + if (_broker.isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", t); + } + } + }, getTaskExecutor().getExecutor() + ); } catch(RuntimeException e) { setState(State.ERRORED); + returnVal.set(null); if (_broker.isManagementMode()) { LOGGER.warn("Failed to make " + this + " active.", e); @@ -141,6 +175,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< throw e; } } + return returnVal; } @Override @@ -183,40 +218,73 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { + final SettableFuture<Void> returnVal = SettableFuture.create(); setState(State.DELETED); deleteVirtualHostIfExists(); - final ListenableFuture<Void> closeFuture = close(); - deleted(); - DurableConfigurationStore configurationStore = getConfigurationStore(); - if (configurationStore != null) + final ListenableFuture<Void> closeFuture = closeAsync(); + closeFuture.addListener(new Runnable() { - configurationStore.onDelete(this); - } + @Override + public void run() + { + try + { + deleted(); + DurableConfigurationStore configurationStore = getConfigurationStore(); + if (configurationStore != null) + { + configurationStore.onDelete(AbstractVirtualHostNode.this); + } + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + + return returnVal; + } - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { VirtualHost<?, ?, ?> virtualHost = getVirtualHost(); if (virtualHost != null) { - virtualHost.delete(); + return virtualHost.deleteAsync(); + } + else + { + return Futures.immediateFuture(null); } } @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - stopAndSetStateTo(State.STOPPED); + return stopAndSetStateTo(State.STOPPED); } - protected void stopAndSetStateTo(State stoppedState) + protected ListenableFuture<Void> stopAndSetStateTo(final State stoppedState) { - // TODO - deal with async close children - closeChildren(); - closeConfigurationStoreSafely(); - setState(stoppedState); + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> childCloseFuture = closeChildren(); + childCloseFuture.addListener(new Runnable() + { + @Override + public void run() + { + closeConfigurationStoreSafely(); + setState(stoppedState); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } @Override @@ -311,7 +379,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< protected abstract DurableConfigurationStore createConfigurationStore(); - protected abstract void activate(); + protected abstract ListenableFuture<Void> activate(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java index c94d113514..8a160f83d7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +65,7 @@ public class RedirectingVirtualHostNodeImpl } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { try { @@ -83,6 +85,7 @@ public class RedirectingVirtualHostNodeImpl throw e; } } + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 889984eb67..b2d35f690f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -151,6 +151,7 @@ public class HeadersBindingTest extends TestCase _count++; _queue = mock(AMQQueue.class); TaskExecutor executor = new CurrentThreadTaskExecutor(); + executor.start(); VirtualHostImpl vhost = mock(VirtualHostImpl.class); when(_queue.getVirtualHost()).thenReturn(vhost); when(_queue.getModel()).thenReturn(BrokerModel.getInstance()); @@ -158,6 +159,7 @@ public class HeadersBindingTest extends TestCase when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); final EventLogger eventLogger = new EventLogger(); when(vhost.getEventLogger()).thenReturn(eventLogger); + when(vhost.getTaskExecutor()).thenReturn(executor); _exchange = mock(ExchangeImpl.class); when(_exchange.getType()).thenReturn(ExchangeDefaults.HEADERS_EXCHANGE_CLASS); when(_exchange.getEventLogger()).thenReturn(eventLogger); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index c21a386eaa..26db573e41 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.model; import static java.util.Arrays.asList; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -33,12 +34,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.security.AccessControlException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -55,6 +59,7 @@ import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -413,7 +418,30 @@ public class VirtualHostTest extends QpidTestCase private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost) { final AMQConnectionModel connection = mock(AMQConnectionModel.class); + final List<Action<?>> tasks = new ArrayList<>(); + final ArgumentCaptor<Action> deleteTaskCaptor = ArgumentCaptor.forClass(Action.class); + Answer answer = new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + return tasks.add(deleteTaskCaptor.getValue()); + } + }; + doAnswer(answer).when(connection).addDeleteTask(deleteTaskCaptor.capture()); when(connection.getVirtualHost()).thenReturn(virtualHost); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + for(Action action : tasks) + { + action.performAction(connection); + } + return null; + } + }).when(connection).closeAsync(any(AMQConstant.class),anyString()); when(connection.getRemoteAddressString()).thenReturn("peer:1234"); return connection; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java index 0b35ba9330..5c91052956 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java @@ -26,6 +26,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -127,15 +130,17 @@ public class TestConfiguredObject extends AbstractConfiguredObject } @StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { setState(State.DELETED); + return Futures.immediateFuture(null); } public boolean isOpened() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index c1a9240f2c..7667267df3 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -31,14 +31,12 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import org.apache.log4j.Logger; import org.mockito.ArgumentCaptor; @@ -56,7 +54,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java index 889097f850..74fe371b2f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java @@ -22,17 +22,13 @@ package org.apache.qpid.server.virtualhost; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doThrow; -import java.io.File; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -50,7 +46,6 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.test.utils.QpidTestCase; -import org.mockito.verification.VerificationMode; public class AbstractVirtualHostTest extends QpidTestCase { @@ -90,7 +85,7 @@ public class AbstractVirtualHostTest extends QpidTestCase { if (_taskExecutor != null) { - _taskExecutor.stopImmediately(); + _taskExecutor.stop(); } } finally @@ -179,7 +174,7 @@ public class AbstractVirtualHostTest extends QpidTestCase verify(store, times(0)).closeMessageStore(); } - public void testDeleteInErrorStateAfterOpen() + public void testDeleteInErrorStateAfterOpen() throws Exception { Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName()); AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java index b17f383217..4f8fe097ab 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java @@ -35,6 +35,9 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -560,8 +563,9 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase } @Override - protected void activate() + protected ListenableFuture<Void> activate() { + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java index 00b42094b1..c1bd1b0bb8 100644 --- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java +++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java @@ -26,6 +26,9 @@ import java.util.Collections; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -152,7 +155,7 @@ public class ACLFileAccessControlProviderImpl @StateTransition(currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { if(_broker.isManagementMode()) @@ -180,6 +183,7 @@ public class ACLFileAccessControlProviderImpl } } } + return Futures.immediateFuture(null); } @Override @@ -193,17 +197,36 @@ public class ACLFileAccessControlProviderImpl } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - setState(State.DELETED); - deleted(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + + setState(State.DELETED); + deleted(); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @Override diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java index a34ac16e80..2a691b3652 100644 --- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java +++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.regex.Pattern; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AccessControlProvider; import org.apache.qpid.server.model.Broker; @@ -54,7 +55,9 @@ public class ACLFileAccessControlProviderFactoryTest extends QpidTestCase when(_broker.getObjectFactory()).thenReturn(_objectFactory); when(_broker.getModel()).thenReturn(_objectFactory.getModel()); when(_broker.getCategoryClass()).thenReturn(Broker.class); - when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class)); + TaskExecutor taskExecutor = new CurrentThreadTaskExecutor(); + taskExecutor.start(); + when(_broker.getTaskExecutor()).thenReturn(taskExecutor); } public void testCreateInstanceWhenAclFileIsNotPresent() diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 62a95e9869..28d8a6c88c 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -125,9 +125,8 @@ class ManagementNodeConsumer implements ConsumerImpl } @Override - public ListenableFuture<Void> close() + public void close() { - return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 69920ff488..1a85a24e0b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -39,6 +39,8 @@ import javax.servlet.DispatcherType; import javax.servlet.MultipartConfigElement; import javax.servlet.http.HttpServletRequest; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; @@ -130,7 +132,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void doStart() + private ListenableFuture<Void> doStart() { getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); @@ -148,6 +150,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override @@ -206,7 +209,9 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem if(port.getState() != State.ACTIVE) { - port.start(); + + // TODO - RG + port.startAsync(); } Connector connector = null; diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java index 52d7ba33a3..4327292336 100644 --- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; @@ -58,7 +59,9 @@ public class HttpManagementTest extends QpidTestCase when(_broker.getModel()).thenReturn(objectFactory.getModel()); when(_broker.getCategoryClass()).thenReturn(Broker.class); when(_broker.getEventLogger()).thenReturn(mock(EventLogger.class)); - when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class)); + TaskExecutor taskExecutor = new TaskExecutorImpl(); + taskExecutor.start(); + when(_broker.getTaskExecutor()).thenReturn(taskExecutor); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, false); diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java index 6c962c2901..06558b9f9a 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java @@ -32,6 +32,8 @@ import java.util.Set; import javax.management.InstanceAlreadyExistsException; import javax.management.JMException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -105,7 +107,7 @@ public class JMXManagementPluginImpl } @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void doStart() throws JMException, IOException + private ListenableFuture<Void> doStart() throws JMException, IOException { _allowPortActivation = true; Broker<?> broker = getBroker(); @@ -125,7 +127,8 @@ public class JMXManagementPluginImpl registryPort.setPortManager(this); if(port.getState() != State.ACTIVE) { - port.start(); + // TODO - RG + port.startAsync(); } } @@ -135,7 +138,7 @@ public class JMXManagementPluginImpl connectorPort.setPortManager(this); if(port.getState() != State.ACTIVE) { - port.start(); + port.startAsync(); } } @@ -175,6 +178,7 @@ public class JMXManagementPluginImpl _objectRegistry.start(); setState(State.ACTIVE); _allowPortActivation = false; + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index 32de06186a..b243769b32 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -386,7 +386,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase assertEquals("Incorrect number of exchanges registered after second recovery", origExchangeCount, _virtualHost.getExchanges().size()); assertNull("Durable exchange was not removed:" + directExchangeName, - _virtualHost.getExchange(directExchangeName)); + _virtualHost.getExchange(directExchangeName)); } /** diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java index 4d06c7b624..4ca3b2ba5c 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java @@ -36,7 +36,6 @@ public class LogRecordsRestTest extends QpidRestTestCase assertNotNull("Message id cannot be null", record.get("id")); assertNotNull("Message timestamp cannot be null", record.get("timestamp")); assertEquals("Unexpected log level", "INFO", record.get("level")); - assertEquals("Unexpected thread", "main", record.get("thread")); assertEquals("Unexpected logger", "qpid.message.broker.ready", record.get("logger")); } |