summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java777
1 files changed, 659 insertions, 118 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index e63638213e..529aa230d4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -41,12 +41,23 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
@@ -68,6 +79,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.util.Strings;
@@ -162,7 +174,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this);
- @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" )
+ @ManagedAttributeField
private State _desiredState;
private boolean _openComplete;
private boolean _openFailed;
@@ -439,24 +451,84 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public final void open()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(openAsync());
+ }
+
+
+ public final ListenableFuture<Void> openAsync()
+ {
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ _openFailed = false;
+ OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+ try
+ {
+ doResolution(true, exceptionHandler);
+ doValidation(true, exceptionHandler);
+ doOpening(true, exceptionHandler);
+ return doAttainState(exceptionHandler);
+ }
+ catch (RuntimeException e)
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ return Futures.immediateFuture(null);
+ }
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+
+ }
+ });
+
+ }
+
+ protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action)
+ {
+ final SettableFuture<T> returnVal = SettableFuture.create();
+
+ _taskExecutor.submit(new Task<Void>()
{
- _openFailed = false;
- OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
- try
- {
- doResolution(true, exceptionHandler);
- doValidation(true, exceptionHandler);
- doOpening(true, exceptionHandler);
- doAttainState(exceptionHandler);
- }
- catch(RuntimeException e)
+
+ @Override
+ public Void execute()
{
- exceptionHandler.handleException(e, this);
+ try
+ {
+ Futures.addCallback(action.call(), new FutureCallback<T>()
+ {
+ @Override
+ public void onSuccess(final T result)
+ {
+ returnVal.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ returnVal.setException(e);
+ }
+ return null;
}
- }
+ });
+
+ return returnVal;
}
+
+
public void registerWithParents()
{
for(ConfiguredObject<?> parent : _parents.values())
@@ -468,17 +540,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- protected void closeChildren()
+ private class ChildCounter
{
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
+
+ protected final ListenableFuture<Void> closeChildren()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() );
+
+ }
+ });
+ counter.incrementCount();
+
+
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> child)
{
- child.close();
+ counter.incrementCount();
+ ListenableFuture<Void> close = child.closeAsync();
+ Futures.addCallback(close, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ LOGGER.error("Exception occurred while closing "
+ + child.getClass().getSimpleName()
+ + " : '"
+ + child.getName()
+ + "'", t);
+ // No need to decrement counter as setting the exception will complete the future
+ returnVal.setException(t);
+ }
+ }, MoreExecutors.sameThreadExecutor());
}
});
+ counter.decrementCount();
+
for(Collection<ConfiguredObject<?>> childList : _children.values())
{
childList.clear();
@@ -494,23 +627,60 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
childNameMap.clear();
}
+ return returnVal;
+ }
+
+ @Override
+ public void close()
+ {
+ doSync(closeAsync());
}
@Override
- public final void close()
+ public final ListenableFuture<Void> closeAsync()
{
- if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- beforeClose();
- closeChildren();
- onClose();
- unregister(false);
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+ if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ {
+
+ return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ return closeChildren();
+ }
+ }).then(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+ }
+ });
+ }
+ else
+ {
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+ return Futures.immediateFuture(null);
+ }
+ }
+ });
- }
}
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
+ return Futures.immediateFuture(null);
}
protected void onClose()
@@ -519,48 +689,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void create()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(createAsync());
+ }
+
+ public final ListenableFuture<Void> createAsync()
+ {
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
- if(currentUser != null)
+ @Override
+ public ListenableFuture<Void> call() throws Exception
{
- String currentUserName = currentUser.getName();
- _attributes.put(LAST_UPDATED_BY, currentUserName);
- _attributes.put(CREATED_BY, currentUserName);
- _lastUpdatedBy = currentUserName;
- _createdBy = currentUserName;
- }
- final long currentTime = System.currentTimeMillis();
- _attributes.put(LAST_UPDATED_TIME, currentTime);
- _attributes.put(CREATED_TIME, currentTime);
- _lastUpdatedTime = currentTime;
- _createdTime = currentTime;
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+ if (currentUser != null)
+ {
+ String currentUserName = currentUser.getName();
+ _attributes.put(LAST_UPDATED_BY, currentUserName);
+ _attributes.put(CREATED_BY, currentUserName);
+ _lastUpdatedBy = currentUserName;
+ _createdBy = currentUserName;
+ }
+ final long currentTime = System.currentTimeMillis();
+ _attributes.put(LAST_UPDATED_TIME, currentTime);
+ _attributes.put(CREATED_TIME, currentTime);
+ _lastUpdatedTime = currentTime;
+ _createdTime = currentTime;
+
+ CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
+ try
+ {
+ doResolution(true, createExceptionHandler);
+ doValidation(true, createExceptionHandler);
+ validateOnCreate();
+ registerWithParents();
+ }
+ catch (RuntimeException e)
+ {
+ createExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
- CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
- try
- {
- doResolution(true, createExceptionHandler);
- doValidation(true, createExceptionHandler);
- validateOnCreate();
- registerWithParents();
- }
- catch(RuntimeException e)
- {
- createExceptionHandler.handleException(e, this);
- }
+ final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
+ new CreateExceptionHandler(true);
+
+ try
+ {
+ doCreation(true, unregisteringExceptionHandler);
+ doOpening(true, unregisteringExceptionHandler);
+ return doAttainState(unregisteringExceptionHandler);
+ }
+ catch (RuntimeException e)
+ {
+ unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
+ }
+ return Futures.immediateFuture(null);
- AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true);
- try
- {
- doCreation(true, unregisteringExceptionHandler);
- doOpening(true, unregisteringExceptionHandler);
- doAttainState(unregisteringExceptionHandler);
- }
- catch(RuntimeException e)
- {
- unregisteringExceptionHandler.handleException(e, this);
}
- }
+ });
+
}
protected void validateOnCreate()
@@ -610,8 +797,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
}
- private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+ private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ attainState().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ catch(RuntimeException e)
+ {
+ try
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ catch(Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }
+ }
+ });
+ counter.incrementCount();
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
@@ -619,22 +838,43 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (child instanceof AbstractConfiguredObject)
{
- AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
+ final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
if (configuredObject._dynamicState.get() == DynamicState.OPENED)
{
- try
- {
- configuredObject.doAttainState(exceptionHandler);
- }
- catch (RuntimeException e)
- {
- exceptionHandler.handleException(e, configuredObject);
- }
+ counter.incrementCount();
+ Futures.addCallback(configuredObject.doAttainState(exceptionHandler),
+ new FutureCallback()
+ {
+ @Override
+ public void onSuccess(final Object result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ try
+ {
+ if (t instanceof RuntimeException)
+ {
+ exceptionHandler.handleException((RuntimeException) t,
+ configuredObject);
+ }
+ }
+ finally
+ {
+ counter.decrementCount();
+ }
+ }
+ },getTaskExecutor().getExecutor());
+
}
}
}
});
- attainState();
+ counter.decrementCount();
+ return returnVal;
}
protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
@@ -890,16 +1130,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private void attainStateIfOpenedOrReopenFailed()
+ private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed()
{
if (_openComplete || getDesiredState() == State.DELETED)
{
- attainState();
+ return attainState();
}
else if (_openFailed)
{
- open();
+ return openAsync();
}
+ return Futures.immediateFuture(null);
}
protected void onOpen()
@@ -907,10 +1148,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- protected void attainState()
+ protected ListenableFuture<Void> attainState()
{
State currentState = getState();
State desiredState = getDesiredState();
+ ListenableFuture<Void> returnVal;
if(currentState != desiredState)
{
Method stateChangingMethod = getStateChangeMethod(currentState, desiredState);
@@ -918,7 +1160,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
try
{
- stateChangingMethod.invoke(this);
+ returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this);
}
catch (IllegalAccessException e)
{
@@ -938,7 +1180,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying);
}
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
+ return returnVal;
}
private Method getStateChangeMethod(final State currentState, final State desiredState)
@@ -1013,44 +1264,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- private State setDesiredState(final State desiredState)
+ private ListenableFuture<Void> setDesiredState(final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
-
- return runTask(new Task<State>()
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ final State state = getState();
+ final State currentDesiredState = getDesiredState();
+ if(desiredState == currentDesiredState && desiredState != state)
+ {
+ return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable()
+ {
+ @Override
+ public void run()
{
- @Override
- public State execute()
+ final State currentState = getState();
+ if (currentState != state)
{
+ notifyStateChanged(state, currentState);
+ }
- State state = getState();
- if(desiredState == getDesiredState() && desiredState != state)
- {
- attainStateIfOpenedOrReopenFailed();
- final State currentState = getState();
- if (currentState != state)
- {
- notifyStateChanged(state, currentState);
- }
- return currentState;
- }
- else
- {
- setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE,
- desiredState));
+ }
+ });
+ }
+ else
+ {
+ ConfiguredObject<?> proxyForValidation =
+ createProxyForValidation(Collections.<String, Object>singletonMap(
+ ConfiguredObject.DESIRED_STATE,
+ desiredState));
+ Set<String> desiredStateOnlySet = Collections.unmodifiableSet(
+ Collections.singleton(ConfiguredObject.DESIRED_STATE));
+ authoriseSetAttributes(proxyForValidation, desiredStateOnlySet);
+ validateChange(proxyForValidation, desiredStateOnlySet);
+
+ if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+ {
+ attributeSet(ConfiguredObject.DESIRED_STATE,
+ currentDesiredState,
+ desiredState);
+
+ return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (getState() == desiredState)
+ {
+ notifyStateChanged(state, desiredState);
+ }
+
+ }
+ }
+ );
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ }
+ });
- if (getState() == desiredState)
- {
- notifyStateChanged(state, desiredState);
- return desiredState;
- }
- else
- {
- return getState();
- }
- }
- }
- });
}
@Override
@@ -1429,20 +1708,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void stop()
{
- setDesiredState(State.STOPPED);
+ doSync(setDesiredState(State.STOPPED));
}
public final void delete()
{
- if(getState() == State.UNINITIALIZED)
+ doSync(deleteAsync());
+ }
+
+ protected final <R> R doSync(ListenableFuture<R> async)
+ {
+ try
+ {
+ return async.get();
+ }
+ catch (InterruptedException e)
{
- _desiredState = State.DELETED;
+ throw new ServerScopedRuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else if(cause instanceof Error)
+ {
+ throw (Error) cause;
+ }
+ else if(cause != null)
+ {
+ throw new ServerScopedRuntimeException(cause);
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
}
- setDesiredState(State.DELETED);
+ }
+
+ public final ListenableFuture<Void> deleteAsync()
+ {
+ return setDesiredState(State.DELETED);
+ }
+
+ public final void start()
+ {
+ doSync(startAsync());
+ }
+ public ListenableFuture<Void> startAsync()
+ {
+ return setDesiredState(State.ACTIVE);
}
- public final void start() { setDesiredState(State.ACTIVE); }
protected void deleted()
{
@@ -1527,24 +1848,175 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
_taskExecutor.run(task);
}
+ @Override
+ public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ doSync(setAttributesAsync(attributes));
+ }
+
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ second.run();
+ returnVal.set(null);
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
+
+ public static interface ChainedListenableFuture extends ListenableFuture<Void>
+ {
+ ChainedListenableFuture then(Runnable r);
+ ChainedListenableFuture then(Callable<ListenableFuture<Void>> r);
+ }
+
+ public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture
+ {
+ private final Executor _exector;
+
+ public ChainedSettableFuture(final Executor executor)
+ {
+ _exector = executor;
+ }
+
+ @Override
+ public boolean set(Void value)
+ {
+ return super.set(value);
+ }
+
+ @Override
+ public boolean setException(Throwable throwable)
+ {
+ return super.setException(throwable);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Runnable r)
+ {
+ return doAfter(_exector, this, r);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r)
+ {
+ return doAfter(_exector, this,r);
+ }
+ }
+
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ final ListenableFuture<Void> future = second.call();
+ Futures.addCallback(future, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ returnVal.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
@Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
+ final Map<String,Object> updateAttributes = new HashMap<>(attributes);
+ Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
runTask(new VoidTask()
{
@Override
public void execute()
{
authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
- changeAttributes(attributes);
+ validateChange(createProxyForValidation(attributes), attributes.keySet());
+
+ changeAttributes(updateAttributes);
}
});
+ if(desiredState != null)
+ {
+ State state;
+ if(desiredState instanceof State)
+ {
+ state = (State)desiredState;
+ }
+ else if(desiredState instanceof String)
+ {
+ state = State.valueOf((String)desiredState);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+ }
+ return setDesiredState(state);
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
}
protected void changeAttributes(final Map<String, Object> attributes)
{
- validateChange(createProxyForValidation(attributes), attributes.keySet());
Collection<String> names = getAttributeNames();
for (String name : names)
{
@@ -1938,6 +2410,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ private static class CloseResult implements FutureResult
+ {
+ private volatile FutureResult _childFutureResult;
+
+ @Override
+ public boolean isComplete()
+ {
+ return _childFutureResult != null && _childFutureResult.isComplete();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ synchronized (this)
+ {
+ while (_childFutureResult == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion();
+
+ }
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ synchronized (this)
+ {
+ while (_childFutureResult == null && remaining > 0)
+ {
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ remaining = startTime + timeout - System.currentTimeMillis();
+
+ if(remaining <= 0)
+ {
+ throw new TimeoutException("Completion did not occur within given timeout: " + timeout);
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion(remaining);
+
+ }
+
+ public synchronized void setChildFutureResult(final FutureResult childFutureResult)
+ {
+ _childFutureResult = childFutureResult;
+ notifyAll();
+ }
+ }
+
private static class AttributeGettingHandler implements InvocationHandler
{
@@ -2127,7 +2667,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (source.getState() != State.DELETED)
{
- source.delete();
+ // TODO - RG - This isn't right :-(
+ source.deleteAsync();
}
}
finally