summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java463
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java34
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java70
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java56
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java10
18 files changed, 650 insertions, 171 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 2269999e1d..57eb16c0be 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -43,12 +43,14 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -169,7 +171,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this);
- @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" )
+ @ManagedAttributeField
private State _desiredState;
private boolean _openComplete;
private boolean _openFailed;
@@ -446,24 +448,58 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public final void open()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(openAsync());
+ }
+
+
+ public final ListenableFuture<Void> openAsync()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _taskExecutor.run(new VoidTask()
{
- _openFailed = false;
- OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
- try
- {
- doResolution(true, exceptionHandler);
- doValidation(true, exceptionHandler);
- doOpening(true, exceptionHandler);
- doAttainState(exceptionHandler);
- }
- catch(RuntimeException e)
+
+ @Override
+ public void execute()
{
- exceptionHandler.handleException(e, this);
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ _openFailed = false;
+ OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+ try
+ {
+ doResolution(true, exceptionHandler);
+ doValidation(true, exceptionHandler);
+ doOpening(true, exceptionHandler);
+ doAttainState(exceptionHandler).addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, MoreExecutors.sameThreadExecutor()
+ );
+ }
+ catch (RuntimeException e)
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ }
+ else
+ {
+ returnVal.set(null);
+ }
}
- }
+ });
+ return returnVal;
+
}
+
+
public void registerWithParents()
{
for(ConfiguredObject<?> parent : _parents.values())
@@ -475,7 +511,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private static class ChildCounter
+ private class ChildCounter
{
private final AtomicInteger _count = new AtomicInteger();
private final Runnable _task;
@@ -501,8 +537,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
protected final ListenableFuture<Void> closeChildren()
{
- LOGGER.debug("KWDEBUG closing children");
-
final SettableFuture<Void> returnVal = SettableFuture.create();
final ChildCounter counter = new ChildCounter(new Runnable()
{
@@ -510,6 +544,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public void run()
{
returnVal.set(null);
+ LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() );
+
}
});
counter.incrementCount();
@@ -521,7 +557,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public void performAction(final ConfiguredObject<?> child)
{
counter.incrementCount();
- ListenableFuture<Void> close = child.close();
+ ListenableFuture<Void> close = child.closeAsync();
close.addListener(new Runnable()
{
@Override
@@ -554,8 +590,15 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
@Override
- public final ListenableFuture<Void> close()
+ public void close()
+ {
+ doSync(closeAsync());
+ }
+
+ @Override
+ public final ListenableFuture<Void> closeAsync()
{
+ LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName());
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
final SettableFuture<Void> returnVal = SettableFuture.create();
@@ -577,6 +620,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
onClose();
unregister(false);
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
returnVal.set(null);
}
}, getTaskExecutor().getExecutor());
@@ -591,8 +635,13 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public void run()
{
+
onClose();
unregister(false);
+ LOGGER.debug("Closed "
+ + AbstractConfiguredObject.this.getClass().getSimpleName()
+ + " : "
+ + getName());
returnVal.set(null);
}
}, getTaskExecutor().getExecutor());
@@ -604,6 +653,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
else
{
+ LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName());
+
return Futures.immediateFuture(null);
}
}
@@ -619,48 +670,88 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void create()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(createAsync());
+ }
+
+ public final ListenableFuture<Void> createAsync()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _taskExecutor.run(new VoidTask()
{
- final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
- if(currentUser != null)
- {
- String currentUserName = currentUser.getName();
- _attributes.put(LAST_UPDATED_BY, currentUserName);
- _attributes.put(CREATED_BY, currentUserName);
- _lastUpdatedBy = currentUserName;
- _createdBy = currentUserName;
- }
- final long currentTime = System.currentTimeMillis();
- _attributes.put(LAST_UPDATED_TIME, currentTime);
- _attributes.put(CREATED_TIME, currentTime);
- _lastUpdatedTime = currentTime;
- _createdTime = currentTime;
- CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
- try
- {
- doResolution(true, createExceptionHandler);
- doValidation(true, createExceptionHandler);
- validateOnCreate();
- registerWithParents();
- }
- catch(RuntimeException e)
+ @Override
+ public void execute()
{
- createExceptionHandler.handleException(e, this);
- }
- AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true);
- try
- {
- doCreation(true, unregisteringExceptionHandler);
- doOpening(true, unregisteringExceptionHandler);
- doAttainState(unregisteringExceptionHandler);
- }
- catch(RuntimeException e)
- {
- unregisteringExceptionHandler.handleException(e, this);
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+ if (currentUser != null)
+ {
+ String currentUserName = currentUser.getName();
+ _attributes.put(LAST_UPDATED_BY, currentUserName);
+ _attributes.put(CREATED_BY, currentUserName);
+ _lastUpdatedBy = currentUserName;
+ _createdBy = currentUserName;
+ }
+ final long currentTime = System.currentTimeMillis();
+ _attributes.put(LAST_UPDATED_TIME, currentTime);
+ _attributes.put(CREATED_TIME, currentTime);
+ _lastUpdatedTime = currentTime;
+ _createdTime = currentTime;
+
+ CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
+ try
+ {
+ doResolution(true, createExceptionHandler);
+ doValidation(true, createExceptionHandler);
+ validateOnCreate();
+ registerWithParents();
+ }
+ catch (RuntimeException e)
+ {
+ createExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
+
+ final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
+ new CreateExceptionHandler(true);
+ try
+ {
+ doCreation(true, unregisteringExceptionHandler);
+ doOpening(true, unregisteringExceptionHandler);
+ Futures.addCallback(doAttainState(unregisteringExceptionHandler),
+ new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ returnVal.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ if (t instanceof RuntimeException)
+ {
+ unregisteringExceptionHandler.handleException((RuntimeException) t,
+ AbstractConfiguredObject.this);
+ }
+ returnVal.set(null);
+ }
+ },
+ getTaskExecutor().getExecutor());
+ }
+ catch (RuntimeException e)
+ {
+ unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ }
}
- }
+ });
+
+ return returnVal;
}
protected void validateOnCreate()
@@ -710,8 +801,33 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
}
- private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+ private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ attainState().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ catch(RuntimeException e)
+ {
+ returnVal.set(null);
+ throw e;
+ }
+ }
+ });
+ counter.incrementCount();
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
@@ -719,22 +835,36 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (child instanceof AbstractConfiguredObject)
{
- AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
+ final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
if (configuredObject._dynamicState.get() == DynamicState.OPENED)
{
- try
- {
- configuredObject.doAttainState(exceptionHandler);
- }
- catch (RuntimeException e)
- {
- exceptionHandler.handleException(e, configuredObject);
- }
+ counter.incrementCount();
+ Futures.addCallback(configuredObject.doAttainState(exceptionHandler),
+ new FutureCallback()
+ {
+ @Override
+ public void onSuccess(final Object result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ if(t instanceof RuntimeException)
+ {
+ exceptionHandler.handleException((RuntimeException) t, configuredObject);
+ }
+ counter.decrementCount();
+ }
+ },getTaskExecutor().getExecutor());
+
}
}
}
});
- attainState();
+ counter.decrementCount();
+ return returnVal;
}
protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
@@ -990,16 +1120,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private void attainStateIfOpenedOrReopenFailed()
+ private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed()
{
if (_openComplete || getDesiredState() == State.DELETED)
{
- attainState();
+ return attainState();
}
else if (_openFailed)
{
- open();
+ return openAsync();
}
+ return Futures.immediateFuture(null);
}
protected void onOpen()
@@ -1007,10 +1138,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- protected void attainState()
+ protected ListenableFuture<Void> attainState()
{
State currentState = getState();
State desiredState = getDesiredState();
+ ListenableFuture<Void> returnVal;
if(currentState != desiredState)
{
Method stateChangingMethod = getStateChangeMethod(currentState, desiredState);
@@ -1018,7 +1150,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
try
{
- stateChangingMethod.invoke(this);
+ returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this);
}
catch (IllegalAccessException e)
{
@@ -1038,7 +1170,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying);
}
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
+ return returnVal;
}
private Method getStateChangeMethod(final State currentState, final State desiredState)
@@ -1113,46 +1254,93 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- private State setDesiredState(final State desiredState)
+ private ListenableFuture<Void> setDesiredState(final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
- return runTask(new Task<State>()
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ runTask(new Task<Void>()
{
@Override
- public State execute()
+ public Void execute()
{
- State state = getState();
- if(desiredState == getDesiredState() && desiredState != state)
+ final State state = getState();
+ final State currentDesiredState = getDesiredState();
+ if(desiredState == currentDesiredState && desiredState != state)
{
- attainStateIfOpenedOrReopenFailed();
- final State currentState = getState();
- if (currentState != state)
+ attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
{
- notifyStateChanged(state, currentState);
+ @Override
+ public void run()
+ {
+ try
+ {
+ final State currentState = getState();
+ if (currentState != state)
+ {
+ notifyStateChanged(state, currentState);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
}
- return currentState;
+ ,_taskExecutor.getExecutor());
}
else
{
- authoriseSetDesiredState(desiredState);
+ try
+ {
+ authoriseSetDesiredState(desiredState);
+ validateChange(createProxyForValidation(Collections.<String, Object>singletonMap(
+ ConfiguredObject.DESIRED_STATE,
+ desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE));
- setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE,
- desiredState));
+ if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+ {
+ attributeSet(ConfiguredObject.DESIRED_STATE,
+ currentDesiredState,
+ desiredState);
- if (getState() == desiredState)
- {
- notifyStateChanged(state, desiredState);
- return desiredState;
+ attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (getState() == desiredState)
+ {
+ notifyStateChanged(state, desiredState);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+ }, _taskExecutor.getExecutor());
+ }
+ else
+ {
+ returnVal.set(null);
+ }
}
- else
+ catch (RuntimeException | Error e)
{
- return getState();
+ returnVal.set(null);
+ throw e;
}
+
}
+ return null;
}
});
+ return returnVal;
}
@Override
@@ -1531,20 +1719,67 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void stop()
{
- setDesiredState(State.STOPPED);
+ doSync(setDesiredState(State.STOPPED));
}
public final void delete()
{
- if(getState() == State.UNINITIALIZED)
+ doSync(deleteAsync());
+ }
+
+ private void doSync(ListenableFuture<Void> async)
+ {
+ try
+ {
+ async.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else if(cause instanceof Error)
+ {
+ throw (Error) cause;
+ }
+ else if(cause != null)
+ {
+ throw new ServerScopedRuntimeException(cause);
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
+ }
+ }
+
+ public final ListenableFuture<Void> deleteAsync()
+ {
+ /* if(getState() == State.UNINITIALIZED)
{
_desiredState = State.DELETED;
}
- setDesiredState(State.DELETED);
+ */ return setDesiredState(State.DELETED);
}
- public final void start() { setDesiredState(State.ACTIVE); }
+ public final void start()
+ {
+ doSync(startAsync());
+ }
+
+ public ListenableFuture<Void> startAsync()
+ {
+ return setDesiredState(State.ACTIVE);
+ }
+
protected void deleted()
{
@@ -1629,19 +1864,49 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
_taskExecutor.run(task);
}
+ @Override
+ public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ doSync(setAttributesAsync(attributes));
+ }
@Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
+ final Map<String,Object> updateAttributes = new HashMap<>(attributes);
+ Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
runTask(new VoidTask()
{
@Override
public void execute()
{
authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
- changeAttributes(attributes);
+ validateChange(createProxyForValidation(attributes), attributes.keySet());
+
+ changeAttributes(updateAttributes);
}
});
+ if(desiredState != null)
+ {
+ State state;
+ if(desiredState instanceof State)
+ {
+ state = (State)desiredState;
+ }
+ else if(desiredState instanceof String)
+ {
+ state = State.valueOf((String)desiredState);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+ }
+ return setDesiredState(state);
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
}
protected void authoriseSetAttributes(final ConfiguredObject<?> proxyForValidation,
@@ -1652,7 +1917,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
protected void changeAttributes(final Map<String, Object> attributes)
{
- validateChange(createProxyForValidation(attributes), attributes.keySet());
Collection<String> names = getAttributeNames();
for (String name : names)
{
@@ -2193,7 +2457,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (source.getState() != State.DELETED)
{
- source.delete();
+ // TODO - RG - This isn't right :-(
+ source.deleteAsync();
}
}
finally
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
index 5bf5e337ad..f97d2dfe14 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
@@ -23,6 +23,10 @@ package org.apache.qpid.server.model;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectDependency;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -59,6 +63,26 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf
return instance;
}
+
+ @Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ final SettableFuture<X> returnVal = SettableFuture.create();
+ final X instance = createInstance(attributes, parents);
+ final ListenableFuture<Void> createFuture = instance.createAsync();
+ createFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(instance);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ return returnVal;
+ }
+
protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents);
public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
index b421c5aaf1..c6ac7d4073 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
@@ -31,6 +31,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
@@ -194,11 +197,11 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
final EventLogger eventLogger = _eventLogger;
- EventLogger startupLogger;
+ final EventLogger startupLogger;
if (isStartupLoggedToSystemOut())
{
//Create the composite (logging+SystemOut MessageLogger to be used during startup
@@ -232,17 +235,34 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this);
upgrader.perform();
- Broker broker = getBroker();
+ final Broker broker = getBroker();
broker.setEventLogger(startupLogger);
- broker.open();
-
- if (broker.getState() == State.ACTIVE)
- {
- startupLogger.message(BrokerMessages.READY());
- broker.setEventLogger(eventLogger);
- }
-
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ broker.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ if (broker.getState() == State.ACTIVE)
+ {
+ startupLogger.message(BrokerMessages.READY());
+ broker.setEventLogger(eventLogger);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
index 944ed97ccc..c56698c60c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -45,5 +45,4 @@ public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getMatches();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 395cb52fcd..d2ab317f0e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -238,6 +238,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObject... otherParents);
void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+ ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+
Class<? extends ConfiguredObject> getCategoryClass();
Class<? extends ConfiguredObject> getTypeClass();
@@ -250,8 +252,12 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObjectRecord asObjectRecord();
void open();
+ ListenableFuture<Void> openAsync();
+
+ void close();
+ ListenableFuture<Void> closeAsync();
- ListenableFuture<Void> close();
+ ListenableFuture<Void> deleteAsync();
TaskExecutor getTaskExecutor();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
index 7d4023862b..ed7c841344 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
@@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory
<X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+ <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
<X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
index 5026df0e19..82da0fd206 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
@@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
return factory.create(this, attributes, parents);
}
+
+ @Override
+ public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes);
+
+ return factory.createAsync(this, attributes, parents);
+ }
+
+
@Override
public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass,
Map<String, Object> attributes)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
index d0c6fb041e..a93e6a602f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
@@ -40,6 +40,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.plugin.ConfiguredObjectRegistration;
@@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistry
{
if(m.isAnnotationPresent(StateTransition.class))
{
- if(m.getParameterTypes().length == 0)
+ if(ListenableFuture.class.isAssignableFrom(m.getReturnType()))
{
- m.setAccessible(true);
- StateTransition annotation = m.getAnnotation(StateTransition.class);
+ if (m.getParameterTypes().length == 0)
+ {
+ m.setAccessible(true);
+ StateTransition annotation = m.getAnnotation(StateTransition.class);
+
+ for (State state : annotation.currentState())
+ {
+ addStateTransition(state, annotation.desiredState(), m, map);
+ }
- for(State state : annotation.currentState())
+ }
+ else
{
- addStateTransition(state, annotation.desiredState(), m, map);
+ throw new ServerScopedRuntimeException(
+ "A state transition method must have no arguments. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
-
}
else
{
- throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria.");
+ throw new ServerScopedRuntimeException(
+ "A state transition method must return a ListenableFuture. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index b28441438d..1c245363a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -103,7 +103,6 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
//children
Collection<Session> getSessions();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index 7318a58640..999a3594b4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
@ManagedObject
public interface Port<X extends Port<X>> extends ConfiguredObject<X>
{
@@ -76,4 +78,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
void start();
+ ListenableFuture<Void> startAsync();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index cc758ba7c9..c2338c08d8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -147,8 +147,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
void stop();
- void delete();
-
String getRedirectHost(AmqpPort<?> port);
public static interface Transaction
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 28eea21093..dfbe8b12ef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -35,6 +35,9 @@ import java.util.regex.Pattern;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.common.QpidProperties;
@@ -235,13 +238,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if(_parent.isManagementMode())
{
- _managementModeAuthenticationProvider.open();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _managementModeAuthenticationProvider.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ activateWithoutManagementMode();
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
+ else
+ {
+ activateWithoutManagementMode();
+ return Futures.immediateFuture(null);
+ }
+ }
+ private void activateWithoutManagementMode()
+ {
boolean hasBrokerAnyErroredChildren = false;
for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index a4dbd7d5e5..8bcbba9ac4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -160,11 +160,28 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- asyncClose();
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ asyncClose().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index 327b7ddfe9..67533f8244 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -32,6 +32,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -147,7 +150,8 @@ public class FileBasedGroupProviderImpl
GroupAdapter groupAdapter = new GroupAdapter(attrMap);
principals.add(groupAdapter);
groupAdapter.registerWithParents();
- groupAdapter.open();
+ // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread
+ groupAdapter.openAsync();
}
}
@@ -265,7 +269,7 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_groupDatabase != null)
{
@@ -282,29 +286,48 @@ public class FileBasedGroupProviderImpl
throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath()));
}
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- File file = new File(getPath());
- if (file.exists())
- {
- if (!file.delete())
- {
- throw new IllegalConfigurationException("Cannot delete group file");
- }
- }
-
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ File file = new File(getPath());
+ if (file.exists())
+ {
+ if (!file.delete())
+ {
+ throw new IllegalConfigurationException("Cannot delete group file");
+ }
+ }
+
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
public Set<Principal> getGroupPrincipalsForUser(String username)
@@ -377,9 +400,10 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
@@ -396,7 +420,8 @@ public class FileBasedGroupProviderImpl
attrMap.put(GroupMember.NAME, principal.getName());
GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap);
groupMemberAdapter.registerWithParents();
- groupMemberAdapter.open();
+ // todo - this will be safe, but the synchronous open should not be called from the management thread
+ groupMemberAdapter.openAsync();
members.add(groupMemberAdapter);
}
_groupPrincipal = new GroupPrincipal(getName());
@@ -459,12 +484,13 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName());
_groupDatabase.removeGroup(getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
@@ -522,19 +548,21 @@ public class FileBasedGroupProviderImpl
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName());
_groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
index 7046f2973e..c95b3ab804 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
@@ -37,16 +37,17 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.util.BaseAction;
-import org.apache.qpid.server.util.FileHelper;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.type.TypeReference;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -55,6 +56,8 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
public class FileSystemPreferencesProviderImpl
@@ -128,7 +131,7 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_store != null)
{
@@ -138,6 +141,7 @@ public class FileSystemPreferencesProviderImpl
{
throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() );
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -171,33 +175,52 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED)
- private void doQuiesce()
+ private ListenableFuture<Void> doQuiesce()
{
if(_store != null)
{
_store.close();
}
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ if(_store != null)
+ {
+ _store.close();
+ _store.delete();
+ deleted();
+ _authenticationProvider.setPreferencesProvider(null);
+
+ }
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
- if(_store != null)
- {
- _store.close();
- _store.delete();
- deleted();
- _authenticationProvider.setPreferencesProvider(null);
+ return returnVal;
- }
- setState(State.DELETED);
}
@StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE )
- private void restart()
+ private ListenableFuture<Void> restart()
{
if (_store == null)
{
@@ -206,6 +229,7 @@ public class FileSystemPreferencesProviderImpl
_store.open();
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 7c9b439e93..cb412e8d41 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -26,6 +26,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -169,10 +172,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index 21827ffe58..5c53eed509 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -28,6 +28,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -228,14 +231,24 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ setState(State.DELETED);
+ returnVal.set(null);
+
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
@StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
try
{
@@ -246,12 +259,14 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
setState(State.ERRORED);
throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e);
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
index 870621f292..5c3000db4a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model.port;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -108,6 +110,14 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)