diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model')
20 files changed, 1069 insertions, 202 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index e63638213e..529aa230d4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -41,12 +41,23 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; @@ -68,6 +79,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.util.Strings; @@ -162,7 +174,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this); - @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" ) + @ManagedAttributeField private State _desiredState; private boolean _openComplete; private boolean _openFailed; @@ -439,24 +451,84 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public final void open() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(openAsync()); + } + + + public final ListenableFuture<Void> openAsync() + { + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + _openFailed = false; + OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); + try + { + doResolution(true, exceptionHandler); + doValidation(true, exceptionHandler); + doOpening(true, exceptionHandler); + return doAttainState(exceptionHandler); + } + catch (RuntimeException e) + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + return Futures.immediateFuture(null); + } + } + else + { + return Futures.immediateFuture(null); + } + + } + }); + + } + + protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action) + { + final SettableFuture<T> returnVal = SettableFuture.create(); + + _taskExecutor.submit(new Task<Void>() { - _openFailed = false; - OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); - try - { - doResolution(true, exceptionHandler); - doValidation(true, exceptionHandler); - doOpening(true, exceptionHandler); - doAttainState(exceptionHandler); - } - catch(RuntimeException e) + + @Override + public Void execute() { - exceptionHandler.handleException(e, this); + try + { + Futures.addCallback(action.call(), new FutureCallback<T>() + { + @Override + public void onSuccess(final T result) + { + returnVal.set(result); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }); + } + catch (Exception e) + { + returnVal.setException(e); + } + return null; } - } + }); + + return returnVal; } + + public void registerWithParents() { for(ConfiguredObject<?> parent : _parents.values()) @@ -468,17 +540,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - protected void closeChildren() + private class ChildCounter { + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } + + protected final ListenableFuture<Void> closeChildren() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() ); + + } + }); + counter.incrementCount(); + + applyToChildren(new Action<ConfiguredObject<?>>() { @Override public void performAction(final ConfiguredObject<?> child) { - child.close(); + counter.incrementCount(); + ListenableFuture<Void> close = child.closeAsync(); + Futures.addCallback(close, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + counter.decrementCount(); + } + + @Override + public void onFailure(final Throwable t) + { + LOGGER.error("Exception occurred while closing " + + child.getClass().getSimpleName() + + " : '" + + child.getName() + + "'", t); + // No need to decrement counter as setting the exception will complete the future + returnVal.setException(t); + } + }, MoreExecutors.sameThreadExecutor()); } }); + counter.decrementCount(); + for(Collection<ConfiguredObject<?>> childList : _children.values()) { childList.clear(); @@ -494,23 +627,60 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im childNameMap.clear(); } + return returnVal; + } + + @Override + public void close() + { + doSync(closeAsync()); } @Override - public final void close() + public final ListenableFuture<Void> closeAsync() { - if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - beforeClose(); - closeChildren(); - onClose(); - unregister(false); + @Override + public ListenableFuture<Void> call() throws Exception + { + LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + + if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + { + + return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + return closeChildren(); + } + }).then(new Runnable() + { + @Override + public void run() + { + onClose(); + unregister(false); + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + } + }); + } + else + { + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + + return Futures.immediateFuture(null); + } + } + }); - } } - protected void beforeClose() + protected ListenableFuture<Void> beforeClose() { + return Futures.immediateFuture(null); } protected void onClose() @@ -519,48 +689,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void create() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(createAsync()); + } + + public final ListenableFuture<Void> createAsync() + { + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); - if(currentUser != null) + @Override + public ListenableFuture<Void> call() throws Exception { - String currentUserName = currentUser.getName(); - _attributes.put(LAST_UPDATED_BY, currentUserName); - _attributes.put(CREATED_BY, currentUserName); - _lastUpdatedBy = currentUserName; - _createdBy = currentUserName; - } - final long currentTime = System.currentTimeMillis(); - _attributes.put(LAST_UPDATED_TIME, currentTime); - _attributes.put(CREATED_TIME, currentTime); - _lastUpdatedTime = currentTime; - _createdTime = currentTime; + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); + if (currentUser != null) + { + String currentUserName = currentUser.getName(); + _attributes.put(LAST_UPDATED_BY, currentUserName); + _attributes.put(CREATED_BY, currentUserName); + _lastUpdatedBy = currentUserName; + _createdBy = currentUserName; + } + final long currentTime = System.currentTimeMillis(); + _attributes.put(LAST_UPDATED_TIME, currentTime); + _attributes.put(CREATED_TIME, currentTime); + _lastUpdatedTime = currentTime; + _createdTime = currentTime; + + CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); + try + { + doResolution(true, createExceptionHandler); + doValidation(true, createExceptionHandler); + validateOnCreate(); + registerWithParents(); + } + catch (RuntimeException e) + { + createExceptionHandler.handleException(e, AbstractConfiguredObject.this); + } - CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); - try - { - doResolution(true, createExceptionHandler); - doValidation(true, createExceptionHandler); - validateOnCreate(); - registerWithParents(); - } - catch(RuntimeException e) - { - createExceptionHandler.handleException(e, this); - } + final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = + new CreateExceptionHandler(true); + + try + { + doCreation(true, unregisteringExceptionHandler); + doOpening(true, unregisteringExceptionHandler); + return doAttainState(unregisteringExceptionHandler); + } + catch (RuntimeException e) + { + unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this); + } + } + return Futures.immediateFuture(null); - AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true); - try - { - doCreation(true, unregisteringExceptionHandler); - doOpening(true, unregisteringExceptionHandler); - doAttainState(unregisteringExceptionHandler); - } - catch(RuntimeException e) - { - unregisteringExceptionHandler.handleException(e, this); } - } + }); + } protected void validateOnCreate() @@ -610,8 +797,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { } - private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) + private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + try + { + attainState().addListener(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + } + catch(RuntimeException e) + { + try + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + returnVal.set(null); + } + catch(Throwable t) + { + returnVal.setException(t); + } + } + } + }); + counter.incrementCount(); applyToChildren(new Action<ConfiguredObject<?>>() { @Override @@ -619,22 +838,43 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (child instanceof AbstractConfiguredObject) { - AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; + final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; if (configuredObject._dynamicState.get() == DynamicState.OPENED) { - try - { - configuredObject.doAttainState(exceptionHandler); - } - catch (RuntimeException e) - { - exceptionHandler.handleException(e, configuredObject); - } + counter.incrementCount(); + Futures.addCallback(configuredObject.doAttainState(exceptionHandler), + new FutureCallback() + { + @Override + public void onSuccess(final Object result) + { + counter.decrementCount(); + } + + @Override + public void onFailure(final Throwable t) + { + try + { + if (t instanceof RuntimeException) + { + exceptionHandler.handleException((RuntimeException) t, + configuredObject); + } + } + finally + { + counter.decrementCount(); + } + } + },getTaskExecutor().getExecutor()); + } } } }); - attainState(); + counter.decrementCount(); + return returnVal; } protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler) @@ -890,16 +1130,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - private void attainStateIfOpenedOrReopenFailed() + private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed() { if (_openComplete || getDesiredState() == State.DELETED) { - attainState(); + return attainState(); } else if (_openFailed) { - open(); + return openAsync(); } + return Futures.immediateFuture(null); } protected void onOpen() @@ -907,10 +1148,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - protected void attainState() + protected ListenableFuture<Void> attainState() { State currentState = getState(); State desiredState = getDesiredState(); + ListenableFuture<Void> returnVal; if(currentState != desiredState) { Method stateChangingMethod = getStateChangeMethod(currentState, desiredState); @@ -918,7 +1160,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { try { - stateChangingMethod.invoke(this); + returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this); } catch (IllegalAccessException e) { @@ -938,7 +1180,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying); } } + else + { + returnVal = Futures.immediateFuture(null); + } } + else + { + returnVal = Futures.immediateFuture(null); + } + return returnVal; } private Method getStateChangeMethod(final State currentState, final State desiredState) @@ -1013,44 +1264,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - private State setDesiredState(final State desiredState) + private ListenableFuture<Void> setDesiredState(final State desiredState) throws IllegalStateTransitionException, AccessControlException { - - return runTask(new Task<State>() + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + final State state = getState(); + final State currentDesiredState = getDesiredState(); + if(desiredState == currentDesiredState && desiredState != state) + { + return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable() + { + @Override + public void run() { - @Override - public State execute() + final State currentState = getState(); + if (currentState != state) { + notifyStateChanged(state, currentState); + } - State state = getState(); - if(desiredState == getDesiredState() && desiredState != state) - { - attainStateIfOpenedOrReopenFailed(); - final State currentState = getState(); - if (currentState != state) - { - notifyStateChanged(state, currentState); - } - return currentState; - } - else - { - setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE, - desiredState)); + } + }); + } + else + { + ConfiguredObject<?> proxyForValidation = + createProxyForValidation(Collections.<String, Object>singletonMap( + ConfiguredObject.DESIRED_STATE, + desiredState)); + Set<String> desiredStateOnlySet = Collections.unmodifiableSet( + Collections.singleton(ConfiguredObject.DESIRED_STATE)); + authoriseSetAttributes(proxyForValidation, desiredStateOnlySet); + validateChange(proxyForValidation, desiredStateOnlySet); + + if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState)) + { + attributeSet(ConfiguredObject.DESIRED_STATE, + currentDesiredState, + desiredState); + + return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable() + { + @Override + public void run() + { + if (getState() == desiredState) + { + notifyStateChanged(state, desiredState); + } + + } + } + ); + } + else + { + return Futures.immediateFuture(null); + } + } + + } + }); - if (getState() == desiredState) - { - notifyStateChanged(state, desiredState); - return desiredState; - } - else - { - return getState(); - } - } - } - }); } @Override @@ -1429,20 +1708,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void stop() { - setDesiredState(State.STOPPED); + doSync(setDesiredState(State.STOPPED)); } public final void delete() { - if(getState() == State.UNINITIALIZED) + doSync(deleteAsync()); + } + + protected final <R> R doSync(ListenableFuture<R> async) + { + try + { + return async.get(); + } + catch (InterruptedException e) { - _desiredState = State.DELETED; + throw new ServerScopedRuntimeException(e); + } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + if(cause instanceof RuntimeException) + { + throw (RuntimeException) cause; + } + else if(cause instanceof Error) + { + throw (Error) cause; + } + else if(cause != null) + { + throw new ServerScopedRuntimeException(cause); + } + else + { + throw new ServerScopedRuntimeException(e); + } + } - setDesiredState(State.DELETED); + } + + public final ListenableFuture<Void> deleteAsync() + { + return setDesiredState(State.DELETED); + } + + public final void start() + { + doSync(startAsync()); + } + public ListenableFuture<Void> startAsync() + { + return setDesiredState(State.ACTIVE); } - public final void start() { setDesiredState(State.ACTIVE); } protected void deleted() { @@ -1527,24 +1848,175 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im _taskExecutor.run(task); } + @Override + public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + { + doSync(setAttributesAsync(attributes)); + } + + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + second.run(); + returnVal.set(null); + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } + + public static interface ChainedListenableFuture extends ListenableFuture<Void> + { + ChainedListenableFuture then(Runnable r); + ChainedListenableFuture then(Callable<ListenableFuture<Void>> r); + } + + public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture + { + private final Executor _exector; + + public ChainedSettableFuture(final Executor executor) + { + _exector = executor; + } + + @Override + public boolean set(Void value) + { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) + { + return super.setException(throwable); + } + + @Override + public ChainedListenableFuture then(final Runnable r) + { + return doAfter(_exector, this, r); + } + + @Override + public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r) + { + return doAfter(_exector, this,r); + } + } + + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + final ListenableFuture<Void> future = second.call(); + Futures.addCallback(future, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + returnVal.set(null); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { + final Map<String,Object> updateAttributes = new HashMap<>(attributes); + Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE); runTask(new VoidTask() { @Override public void execute() { authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet()); - changeAttributes(attributes); + validateChange(createProxyForValidation(attributes), attributes.keySet()); + + changeAttributes(updateAttributes); } }); + if(desiredState != null) + { + State state; + if(desiredState instanceof State) + { + state = (State)desiredState; + } + else if(desiredState instanceof String) + { + state = State.valueOf((String)desiredState); + } + else + { + throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State"); + } + return setDesiredState(state); + } + else + { + return Futures.immediateFuture(null); + } } protected void changeAttributes(final Map<String, Object> attributes) { - validateChange(createProxyForValidation(attributes), attributes.keySet()); Collection<String> names = getAttributeNames(); for (String name : names) { @@ -1938,6 +2410,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + private static class CloseResult implements FutureResult + { + private volatile FutureResult _childFutureResult; + + @Override + public boolean isComplete() + { + return _childFutureResult != null && _childFutureResult.isComplete(); + } + + @Override + public void waitForCompletion() + { + synchronized (this) + { + while (_childFutureResult == null) + { + try + { + wait(); + } + catch (InterruptedException e) + { + + } + } + } + _childFutureResult.waitForCompletion(); + + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + synchronized (this) + { + while (_childFutureResult == null && remaining > 0) + { + try + { + wait(remaining); + } + catch (InterruptedException e) + { + + } + remaining = startTime + timeout - System.currentTimeMillis(); + + if(remaining <= 0) + { + throw new TimeoutException("Completion did not occur within given timeout: " + timeout); + } + } + } + _childFutureResult.waitForCompletion(remaining); + + } + + public synchronized void setChildFutureResult(final FutureResult childFutureResult) + { + _childFutureResult = childFutureResult; + notifyAll(); + } + } + private static class AttributeGettingHandler implements InvocationHandler { @@ -2127,7 +2667,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (source.getState() != State.DELETED) { - source.delete(); + // TODO - RG - This isn't right :-( + source.deleteAsync(); } } finally diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java index 5bf5e337ad..f97d2dfe14 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java @@ -23,6 +23,10 @@ package org.apache.qpid.server.model; import java.util.HashMap; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.ConfiguredObjectDependency; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -59,6 +63,26 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf return instance; } + + @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + final SettableFuture<X> returnVal = SettableFuture.create(); + final X instance = createInstance(attributes, parents); + final ListenableFuture<Void> createFuture = instance.createAsync(); + createFuture.addListener(new Runnable() + { + @Override + public void run() + { + returnVal.set(instance); + } + }, MoreExecutors.sameThreadExecutor()); + return returnVal; + } + protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents); public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java index b421c5aaf1..c6ac7d4073 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java @@ -31,6 +31,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler; @@ -194,11 +197,11 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - protected void activate() + protected ListenableFuture<Void> activate() { final EventLogger eventLogger = _eventLogger; - EventLogger startupLogger; + final EventLogger startupLogger; if (isStartupLoggedToSystemOut()) { //Create the composite (logging+SystemOut MessageLogger to be used during startup @@ -232,17 +235,34 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>> BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this); upgrader.perform(); - Broker broker = getBroker(); + final Broker broker = getBroker(); broker.setEventLogger(startupLogger); - broker.open(); - - if (broker.getState() == State.ACTIVE) - { - startupLogger.message(BrokerMessages.READY()); - broker.setEventLogger(eventLogger); - } - + final SettableFuture<Void> returnVal = SettableFuture.create(); + broker.openAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + + if (broker.getState() == State.ACTIVE) + { + startupLogger.message(BrokerMessages.READY()); + broker.setEventLogger(eventLogger); + } + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + + return returnVal; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java index 944ed97ccc..c56698c60c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java @@ -45,5 +45,4 @@ public interface Binding<X extends Binding<X>> extends ConfiguredObject<X> @ManagedStatistic long getMatches(); - void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java new file mode 100644 index 0000000000..5e9d794e14 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.model; + + +public interface CloseFuture +{ + public void runWhenComplete(final Runnable closeRunnable); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 2d60879861..d2ab317f0e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -236,6 +238,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> ConfiguredObject... otherParents); void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; + ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; + Class<? extends ConfiguredObject> getCategoryClass(); Class<? extends ConfiguredObject> getTypeClass(); @@ -248,8 +252,12 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> ConfiguredObjectRecord asObjectRecord(); void open(); + ListenableFuture<Void> openAsync(); void close(); + ListenableFuture<Void> closeAsync(); + + ListenableFuture<Void> deleteAsync(); TaskExecutor getTaskExecutor(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java index 7d4023862b..ed7c841344 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedConfiguredObject; @@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory <X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents); + <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents); + <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java index 5026df0e19..82da0fd206 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory return factory.create(this, attributes, parents); } + + @Override + public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes); + + return factory.createAsync(this, attributes, parents); + } + + @Override public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass, Map<String, Object> attributes) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java index d0c6fb041e..a93e6a602f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java @@ -40,6 +40,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.plugin.ConfiguredObjectRegistration; @@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistry { if(m.isAnnotationPresent(StateTransition.class)) { - if(m.getParameterTypes().length == 0) + if(ListenableFuture.class.isAssignableFrom(m.getReturnType())) { - m.setAccessible(true); - StateTransition annotation = m.getAnnotation(StateTransition.class); + if (m.getParameterTypes().length == 0) + { + m.setAccessible(true); + StateTransition annotation = m.getAnnotation(StateTransition.class); + + for (State state : annotation.currentState()) + { + addStateTransition(state, annotation.desiredState(), m, map); + } - for(State state : annotation.currentState()) + } + else { - addStateTransition(state, annotation.desiredState(), m, map); + throw new ServerScopedRuntimeException( + "A state transition method must have no arguments. Method " + + m.getName() + + " on " + + clazz.getName() + + " does not meet this criteria."); } - } else { - throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria."); + throw new ServerScopedRuntimeException( + "A state transition method must return a ListenableFuture. Method " + + m.getName() + + " on " + + clazz.getName() + + " does not meet this criteria."); } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java index b28441438d..1c245363a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java @@ -103,7 +103,6 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X> //children Collection<Session> getSessions(); - void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java index 7318a58640..999a3594b4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; + @ManagedObject public interface Port<X extends Port<X>> extends ConfiguredObject<X> { @@ -76,4 +78,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X> void start(); + ListenableFuture<Void> startAsync(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index cc758ba7c9..c2338c08d8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -147,8 +147,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, void stop(); - void delete(); - String getRedirectHost(AmqpPort<?> port); public static interface Transaction diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index be1d6ebf59..036c4d7716 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -35,6 +35,9 @@ import java.util.regex.Pattern; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.common.QpidProperties; @@ -234,13 +237,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if(_parent.isManagementMode()) { - _managementModeAuthenticationProvider.open(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + + _managementModeAuthenticationProvider.openAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + activateWithoutManagementMode(); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } + else + { + activateWithoutManagementMode(); + return Futures.immediateFuture(null); + } + } + private void activateWithoutManagementMode() + { boolean hasBrokerAnyErroredChildren = false; for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass())) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index e03904789d..8bcbba9ac4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -27,10 +27,13 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.CloseFuture; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; @@ -51,6 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection private final Action _underlyingConnectionDeleteTask; private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false); private AMQConnectionModel _underlyingConnection; + private final AtomicBoolean _closing = new AtomicBoolean(); public ConnectionAdapter(final AMQConnectionModel conn) { @@ -156,17 +160,59 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { - closeUnderlyingConnection(); - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + asyncClose().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; + } + + @Override + protected ListenableFuture<Void> beforeClose() + { + _closing.set(true); + + return asyncClose(); + + } + + private ListenableFuture<Void> asyncClose() + { + final SettableFuture<Void> closeFuture = SettableFuture.create(); + + _underlyingConnection.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + closeFuture.set(null); + } + }); + + _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + return closeFuture; } @Override protected void onClose() { - closeUnderlyingConnection(); } @Override @@ -233,23 +279,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection // SessionAdapter installs delete task to cause session model object to delete } - private void closeUnderlyingConnection() + + private static class ConnectionCloseFuture implements CloseFuture { - if (_underlyingClosed.compareAndSet(false, true)) + private boolean _closed; + + public synchronized void connectionClosed() { - _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask); - try + _closed = true; + notifyAll(); + + } + + @Override + public void runWhenComplete(final Runnable closeRunnable) + { + if (_closed ) { - _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + closeRunnable.run(); } - catch (Exception e) + else { - LOGGER.warn("Exception closing connection " - + _underlyingConnection.getConnectionId() - + " from " - + _underlyingConnection.getRemoteAddressString(), e); - } + Thread t = new Thread(new Runnable() + { + @Override + public void run() + { + synchronized (ConnectionCloseFuture.this) + { + while (!_closed) + { + try + { + ConnectionCloseFuture.this.wait(); + } + catch (InterruptedException e) + { + } + } + + closeRunnable.run(); + } + } + }); + + t.setDaemon(true); + t.start(); + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index fda8a6f2e9..1a119be32d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -31,6 +31,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; @@ -145,7 +148,8 @@ public class FileBasedGroupProviderImpl GroupAdapter groupAdapter = new GroupAdapter(attrMap); principals.add(groupAdapter); groupAdapter.registerWithParents(); - groupAdapter.open(); + // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread + groupAdapter.openAsync(); } } @@ -261,7 +265,7 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_groupDatabase != null) { @@ -278,29 +282,48 @@ public class FileBasedGroupProviderImpl throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath())); } } + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - File file = new File(getPath()); - if (file.exists()) - { - if (!file.delete()) - { - throw new IllegalConfigurationException("Cannot delete group file"); - } - } - - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + File file = new File(getPath()); + if (file.exists()) + { + if (!file.delete()) + { + throw new IllegalConfigurationException("Cannot delete group file"); + } + } + + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } public Set<Principal> getGroupPrincipalsForUser(String username) @@ -352,9 +375,10 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override @@ -371,7 +395,8 @@ public class FileBasedGroupProviderImpl attrMap.put(GroupMember.NAME, principal.getName()); GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap); groupMemberAdapter.registerWithParents(); - groupMemberAdapter.open(); + // todo - this will be safe, but the synchronous open should not be called from the management thread + groupMemberAdapter.openAsync(); members.add(groupMemberAdapter); } _groupPrincipal = new GroupPrincipal(getName()); @@ -432,11 +457,12 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { _groupDatabase.removeGroup(getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override @@ -494,17 +520,19 @@ public class FileBasedGroupProviderImpl } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java index 2b77b0d2a9..500df8cb87 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java @@ -37,16 +37,17 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.util.BaseAction; -import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; import org.codehaus.jackson.type.TypeReference; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; @@ -55,6 +56,8 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; public class FileSystemPreferencesProviderImpl @@ -128,7 +131,7 @@ public class FileSystemPreferencesProviderImpl } @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_store != null) { @@ -138,6 +141,7 @@ public class FileSystemPreferencesProviderImpl { throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() ); } + return Futures.immediateFuture(null); } @Override @@ -171,33 +175,52 @@ public class FileSystemPreferencesProviderImpl } @StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED) - private void doQuiesce() + private ListenableFuture<Void> doQuiesce() { if(_store != null) { _store.close(); } setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + if(_store != null) + { + _store.close(); + _store.delete(); + deleted(); + _authenticationProvider.setPreferencesProvider(null); + + } + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - if(_store != null) - { - _store.close(); - _store.delete(); - deleted(); - _authenticationProvider.setPreferencesProvider(null); + return returnVal; - } - setState(State.DELETED); } @StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE ) - private void restart() + private ListenableFuture<Void> restart() { if (_store == null) { @@ -206,6 +229,7 @@ public class FileSystemPreferencesProviderImpl _store.open(); setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 7c9b439e93..cb412e8d41 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -26,6 +26,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -169,10 +172,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java index 791bbe4dd3..0e6f18a70a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java @@ -27,6 +27,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -226,14 +229,24 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener(new Runnable() + { + @Override + public void run() + { + setState(State.DELETED); + returnVal.set(null); + + } + }, getTaskExecutor().getExecutor()); + return returnVal; } @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { try { @@ -244,12 +257,14 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo setState(State.ERRORED); throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e); } + return Futures.immediateFuture(null); } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index 43cb5f0c62..350f137a04 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.BrokerProperties; @@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< private final Broker<?> _broker; private AcceptingTransport _transport; + private final AtomicBoolean _closing = new AtomicBoolean(); + private final SettableFuture _noConnectionsRemain = SettableFuture.create(); @ManagedObjectFactoryConstructor public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker) @@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< } @Override + protected ListenableFuture<Void> beforeClose() + { + _closing.set(true); + + if (_connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + + return _noConnectionsRemain; + } + + @Override protected void onClose() { if (_transport != null) @@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< { _broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort())); } + + _transport.close(); } } @@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< _connectionCountWarningGiven.compareAndSet(true,false); } + if (_closing.get() && _connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + return openConnections; } @@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< @Override public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress) { - return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections; + return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections ); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java index 870621f292..5c3000db4a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model.port; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -108,6 +110,14 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) |