diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java | 777 |
1 files changed, 659 insertions, 118 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index e63638213e..529aa230d4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -41,12 +41,23 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; @@ -68,6 +79,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.util.Strings; @@ -162,7 +174,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this); - @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" ) + @ManagedAttributeField private State _desiredState; private boolean _openComplete; private boolean _openFailed; @@ -439,24 +451,84 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public final void open() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(openAsync()); + } + + + public final ListenableFuture<Void> openAsync() + { + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + _openFailed = false; + OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); + try + { + doResolution(true, exceptionHandler); + doValidation(true, exceptionHandler); + doOpening(true, exceptionHandler); + return doAttainState(exceptionHandler); + } + catch (RuntimeException e) + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + return Futures.immediateFuture(null); + } + } + else + { + return Futures.immediateFuture(null); + } + + } + }); + + } + + protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action) + { + final SettableFuture<T> returnVal = SettableFuture.create(); + + _taskExecutor.submit(new Task<Void>() { - _openFailed = false; - OpenExceptionHandler exceptionHandler = new OpenExceptionHandler(); - try - { - doResolution(true, exceptionHandler); - doValidation(true, exceptionHandler); - doOpening(true, exceptionHandler); - doAttainState(exceptionHandler); - } - catch(RuntimeException e) + + @Override + public Void execute() { - exceptionHandler.handleException(e, this); + try + { + Futures.addCallback(action.call(), new FutureCallback<T>() + { + @Override + public void onSuccess(final T result) + { + returnVal.set(result); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }); + } + catch (Exception e) + { + returnVal.setException(e); + } + return null; } - } + }); + + return returnVal; } + + public void registerWithParents() { for(ConfiguredObject<?> parent : _parents.values()) @@ -468,17 +540,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - protected void closeChildren() + private class ChildCounter { + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } + + protected final ListenableFuture<Void> closeChildren() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() ); + + } + }); + counter.incrementCount(); + + applyToChildren(new Action<ConfiguredObject<?>>() { @Override public void performAction(final ConfiguredObject<?> child) { - child.close(); + counter.incrementCount(); + ListenableFuture<Void> close = child.closeAsync(); + Futures.addCallback(close, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + counter.decrementCount(); + } + + @Override + public void onFailure(final Throwable t) + { + LOGGER.error("Exception occurred while closing " + + child.getClass().getSimpleName() + + " : '" + + child.getName() + + "'", t); + // No need to decrement counter as setting the exception will complete the future + returnVal.setException(t); + } + }, MoreExecutors.sameThreadExecutor()); } }); + counter.decrementCount(); + for(Collection<ConfiguredObject<?>> childList : _children.values()) { childList.clear(); @@ -494,23 +627,60 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im childNameMap.clear(); } + return returnVal; + } + + @Override + public void close() + { + doSync(closeAsync()); } @Override - public final void close() + public final ListenableFuture<Void> closeAsync() { - if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - beforeClose(); - closeChildren(); - onClose(); - unregister(false); + @Override + public ListenableFuture<Void> call() throws Exception + { + LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + + if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + { + + return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + return closeChildren(); + } + }).then(new Runnable() + { + @Override + public void run() + { + onClose(); + unregister(false); + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + } + }); + } + else + { + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); + + return Futures.immediateFuture(null); + } + } + }); - } } - protected void beforeClose() + protected ListenableFuture<Void> beforeClose() { + return Futures.immediateFuture(null); } protected void onClose() @@ -519,48 +689,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void create() { - if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + doSync(createAsync()); + } + + public final ListenableFuture<Void> createAsync() + { + return doOnConfigThread(new Callable<ListenableFuture<Void>>() { - final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); - if(currentUser != null) + @Override + public ListenableFuture<Void> call() throws Exception { - String currentUserName = currentUser.getName(); - _attributes.put(LAST_UPDATED_BY, currentUserName); - _attributes.put(CREATED_BY, currentUserName); - _lastUpdatedBy = currentUserName; - _createdBy = currentUserName; - } - final long currentTime = System.currentTimeMillis(); - _attributes.put(LAST_UPDATED_TIME, currentTime); - _attributes.put(CREATED_TIME, currentTime); - _lastUpdatedTime = currentTime; - _createdTime = currentTime; + if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) + { + final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); + if (currentUser != null) + { + String currentUserName = currentUser.getName(); + _attributes.put(LAST_UPDATED_BY, currentUserName); + _attributes.put(CREATED_BY, currentUserName); + _lastUpdatedBy = currentUserName; + _createdBy = currentUserName; + } + final long currentTime = System.currentTimeMillis(); + _attributes.put(LAST_UPDATED_TIME, currentTime); + _attributes.put(CREATED_TIME, currentTime); + _lastUpdatedTime = currentTime; + _createdTime = currentTime; + + CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); + try + { + doResolution(true, createExceptionHandler); + doValidation(true, createExceptionHandler); + validateOnCreate(); + registerWithParents(); + } + catch (RuntimeException e) + { + createExceptionHandler.handleException(e, AbstractConfiguredObject.this); + } - CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler(); - try - { - doResolution(true, createExceptionHandler); - doValidation(true, createExceptionHandler); - validateOnCreate(); - registerWithParents(); - } - catch(RuntimeException e) - { - createExceptionHandler.handleException(e, this); - } + final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = + new CreateExceptionHandler(true); + + try + { + doCreation(true, unregisteringExceptionHandler); + doOpening(true, unregisteringExceptionHandler); + return doAttainState(unregisteringExceptionHandler); + } + catch (RuntimeException e) + { + unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this); + } + } + return Futures.immediateFuture(null); - AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true); - try - { - doCreation(true, unregisteringExceptionHandler); - doOpening(true, unregisteringExceptionHandler); - doAttainState(unregisteringExceptionHandler); - } - catch(RuntimeException e) - { - unregisteringExceptionHandler.handleException(e, this); } - } + }); + } protected void validateOnCreate() @@ -610,8 +797,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { } - private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) + private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler) { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + try + { + attainState().addListener(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + } + catch(RuntimeException e) + { + try + { + exceptionHandler.handleException(e, AbstractConfiguredObject.this); + returnVal.set(null); + } + catch(Throwable t) + { + returnVal.setException(t); + } + } + } + }); + counter.incrementCount(); applyToChildren(new Action<ConfiguredObject<?>>() { @Override @@ -619,22 +838,43 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (child instanceof AbstractConfiguredObject) { - AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; + final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child; if (configuredObject._dynamicState.get() == DynamicState.OPENED) { - try - { - configuredObject.doAttainState(exceptionHandler); - } - catch (RuntimeException e) - { - exceptionHandler.handleException(e, configuredObject); - } + counter.incrementCount(); + Futures.addCallback(configuredObject.doAttainState(exceptionHandler), + new FutureCallback() + { + @Override + public void onSuccess(final Object result) + { + counter.decrementCount(); + } + + @Override + public void onFailure(final Throwable t) + { + try + { + if (t instanceof RuntimeException) + { + exceptionHandler.handleException((RuntimeException) t, + configuredObject); + } + } + finally + { + counter.decrementCount(); + } + } + },getTaskExecutor().getExecutor()); + } } } }); - attainState(); + counter.decrementCount(); + return returnVal; } protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler) @@ -890,16 +1130,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - private void attainStateIfOpenedOrReopenFailed() + private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed() { if (_openComplete || getDesiredState() == State.DELETED) { - attainState(); + return attainState(); } else if (_openFailed) { - open(); + return openAsync(); } + return Futures.immediateFuture(null); } protected void onOpen() @@ -907,10 +1148,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - protected void attainState() + protected ListenableFuture<Void> attainState() { State currentState = getState(); State desiredState = getDesiredState(); + ListenableFuture<Void> returnVal; if(currentState != desiredState) { Method stateChangingMethod = getStateChangeMethod(currentState, desiredState); @@ -918,7 +1160,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { try { - stateChangingMethod.invoke(this); + returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this); } catch (IllegalAccessException e) { @@ -938,7 +1180,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying); } } + else + { + returnVal = Futures.immediateFuture(null); + } } + else + { + returnVal = Futures.immediateFuture(null); + } + return returnVal; } private Method getStateChangeMethod(final State currentState, final State desiredState) @@ -1013,44 +1264,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - private State setDesiredState(final State desiredState) + private ListenableFuture<Void> setDesiredState(final State desiredState) throws IllegalStateTransitionException, AccessControlException { - - return runTask(new Task<State>() + return doOnConfigThread(new Callable<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> call() throws Exception + { + final State state = getState(); + final State currentDesiredState = getDesiredState(); + if(desiredState == currentDesiredState && desiredState != state) + { + return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable() + { + @Override + public void run() { - @Override - public State execute() + final State currentState = getState(); + if (currentState != state) { + notifyStateChanged(state, currentState); + } - State state = getState(); - if(desiredState == getDesiredState() && desiredState != state) - { - attainStateIfOpenedOrReopenFailed(); - final State currentState = getState(); - if (currentState != state) - { - notifyStateChanged(state, currentState); - } - return currentState; - } - else - { - setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE, - desiredState)); + } + }); + } + else + { + ConfiguredObject<?> proxyForValidation = + createProxyForValidation(Collections.<String, Object>singletonMap( + ConfiguredObject.DESIRED_STATE, + desiredState)); + Set<String> desiredStateOnlySet = Collections.unmodifiableSet( + Collections.singleton(ConfiguredObject.DESIRED_STATE)); + authoriseSetAttributes(proxyForValidation, desiredStateOnlySet); + validateChange(proxyForValidation, desiredStateOnlySet); + + if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState)) + { + attributeSet(ConfiguredObject.DESIRED_STATE, + currentDesiredState, + desiredState); + + return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable() + { + @Override + public void run() + { + if (getState() == desiredState) + { + notifyStateChanged(state, desiredState); + } + + } + } + ); + } + else + { + return Futures.immediateFuture(null); + } + } + + } + }); - if (getState() == desiredState) - { - notifyStateChanged(state, desiredState); - return desiredState; - } - else - { - return getState(); - } - } - } - }); } @Override @@ -1429,20 +1708,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public final void stop() { - setDesiredState(State.STOPPED); + doSync(setDesiredState(State.STOPPED)); } public final void delete() { - if(getState() == State.UNINITIALIZED) + doSync(deleteAsync()); + } + + protected final <R> R doSync(ListenableFuture<R> async) + { + try + { + return async.get(); + } + catch (InterruptedException e) { - _desiredState = State.DELETED; + throw new ServerScopedRuntimeException(e); + } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + if(cause instanceof RuntimeException) + { + throw (RuntimeException) cause; + } + else if(cause instanceof Error) + { + throw (Error) cause; + } + else if(cause != null) + { + throw new ServerScopedRuntimeException(cause); + } + else + { + throw new ServerScopedRuntimeException(e); + } + } - setDesiredState(State.DELETED); + } + + public final ListenableFuture<Void> deleteAsync() + { + return setDesiredState(State.DELETED); + } + + public final void start() + { + doSync(startAsync()); + } + public ListenableFuture<Void> startAsync() + { + return setDesiredState(State.ACTIVE); } - public final void start() { setDesiredState(State.ACTIVE); } protected void deleted() { @@ -1527,24 +1848,175 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im _taskExecutor.run(task); } + @Override + public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + { + doSync(setAttributesAsync(attributes)); + } + + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + second.run(); + returnVal.set(null); + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } + + public static interface ChainedListenableFuture extends ListenableFuture<Void> + { + ChainedListenableFuture then(Runnable r); + ChainedListenableFuture then(Callable<ListenableFuture<Void>> r); + } + + public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture + { + private final Executor _exector; + + public ChainedSettableFuture(final Executor executor) + { + _exector = executor; + } + + @Override + public boolean set(Void value) + { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) + { + return super.setException(throwable); + } + + @Override + public ChainedListenableFuture then(final Runnable r) + { + return doAfter(_exector, this, r); + } + + @Override + public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r) + { + return doAfter(_exector, this,r); + } + } + + protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + { + final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); + Futures.addCallback(first, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + final ListenableFuture<Void> future = second.call(); + Futures.addCallback(future, new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + returnVal.set(null); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException + public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { + final Map<String,Object> updateAttributes = new HashMap<>(attributes); + Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE); runTask(new VoidTask() { @Override public void execute() { authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet()); - changeAttributes(attributes); + validateChange(createProxyForValidation(attributes), attributes.keySet()); + + changeAttributes(updateAttributes); } }); + if(desiredState != null) + { + State state; + if(desiredState instanceof State) + { + state = (State)desiredState; + } + else if(desiredState instanceof String) + { + state = State.valueOf((String)desiredState); + } + else + { + throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State"); + } + return setDesiredState(state); + } + else + { + return Futures.immediateFuture(null); + } } protected void changeAttributes(final Map<String, Object> attributes) { - validateChange(createProxyForValidation(attributes), attributes.keySet()); Collection<String> names = getAttributeNames(); for (String name : names) { @@ -1938,6 +2410,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + private static class CloseResult implements FutureResult + { + private volatile FutureResult _childFutureResult; + + @Override + public boolean isComplete() + { + return _childFutureResult != null && _childFutureResult.isComplete(); + } + + @Override + public void waitForCompletion() + { + synchronized (this) + { + while (_childFutureResult == null) + { + try + { + wait(); + } + catch (InterruptedException e) + { + + } + } + } + _childFutureResult.waitForCompletion(); + + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + synchronized (this) + { + while (_childFutureResult == null && remaining > 0) + { + try + { + wait(remaining); + } + catch (InterruptedException e) + { + + } + remaining = startTime + timeout - System.currentTimeMillis(); + + if(remaining <= 0) + { + throw new TimeoutException("Completion did not occur within given timeout: " + timeout); + } + } + } + _childFutureResult.waitForCompletion(remaining); + + } + + public synchronized void setChildFutureResult(final FutureResult childFutureResult) + { + _childFutureResult = childFutureResult; + notifyAll(); + } + } + private static class AttributeGettingHandler implements InvocationHandler { @@ -2127,7 +2667,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if (source.getState() != State.DELETED) { - source.delete(); + // TODO - RG - This isn't right :-( + source.deleteAsync(); } } finally |