summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java777
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java34
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java109
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java70
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java56
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java10
20 files changed, 1069 insertions, 202 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index e63638213e..529aa230d4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -41,12 +41,23 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
@@ -68,6 +79,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.util.Strings;
@@ -162,7 +174,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this);
- @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" )
+ @ManagedAttributeField
private State _desiredState;
private boolean _openComplete;
private boolean _openFailed;
@@ -439,24 +451,84 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public final void open()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(openAsync());
+ }
+
+
+ public final ListenableFuture<Void> openAsync()
+ {
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ _openFailed = false;
+ OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+ try
+ {
+ doResolution(true, exceptionHandler);
+ doValidation(true, exceptionHandler);
+ doOpening(true, exceptionHandler);
+ return doAttainState(exceptionHandler);
+ }
+ catch (RuntimeException e)
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ return Futures.immediateFuture(null);
+ }
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+
+ }
+ });
+
+ }
+
+ protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action)
+ {
+ final SettableFuture<T> returnVal = SettableFuture.create();
+
+ _taskExecutor.submit(new Task<Void>()
{
- _openFailed = false;
- OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
- try
- {
- doResolution(true, exceptionHandler);
- doValidation(true, exceptionHandler);
- doOpening(true, exceptionHandler);
- doAttainState(exceptionHandler);
- }
- catch(RuntimeException e)
+
+ @Override
+ public Void execute()
{
- exceptionHandler.handleException(e, this);
+ try
+ {
+ Futures.addCallback(action.call(), new FutureCallback<T>()
+ {
+ @Override
+ public void onSuccess(final T result)
+ {
+ returnVal.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ returnVal.setException(e);
+ }
+ return null;
}
- }
+ });
+
+ return returnVal;
}
+
+
public void registerWithParents()
{
for(ConfiguredObject<?> parent : _parents.values())
@@ -468,17 +540,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- protected void closeChildren()
+ private class ChildCounter
{
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
+
+ protected final ListenableFuture<Void> closeChildren()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() );
+
+ }
+ });
+ counter.incrementCount();
+
+
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> child)
{
- child.close();
+ counter.incrementCount();
+ ListenableFuture<Void> close = child.closeAsync();
+ Futures.addCallback(close, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ LOGGER.error("Exception occurred while closing "
+ + child.getClass().getSimpleName()
+ + " : '"
+ + child.getName()
+ + "'", t);
+ // No need to decrement counter as setting the exception will complete the future
+ returnVal.setException(t);
+ }
+ }, MoreExecutors.sameThreadExecutor());
}
});
+ counter.decrementCount();
+
for(Collection<ConfiguredObject<?>> childList : _children.values())
{
childList.clear();
@@ -494,23 +627,60 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
childNameMap.clear();
}
+ return returnVal;
+ }
+
+ @Override
+ public void close()
+ {
+ doSync(closeAsync());
}
@Override
- public final void close()
+ public final ListenableFuture<Void> closeAsync()
{
- if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- beforeClose();
- closeChildren();
- onClose();
- unregister(false);
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+ if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ {
+
+ return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ return closeChildren();
+ }
+ }).then(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+ }
+ });
+ }
+ else
+ {
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+ return Futures.immediateFuture(null);
+ }
+ }
+ });
- }
}
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
+ return Futures.immediateFuture(null);
}
protected void onClose()
@@ -519,48 +689,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void create()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(createAsync());
+ }
+
+ public final ListenableFuture<Void> createAsync()
+ {
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
- if(currentUser != null)
+ @Override
+ public ListenableFuture<Void> call() throws Exception
{
- String currentUserName = currentUser.getName();
- _attributes.put(LAST_UPDATED_BY, currentUserName);
- _attributes.put(CREATED_BY, currentUserName);
- _lastUpdatedBy = currentUserName;
- _createdBy = currentUserName;
- }
- final long currentTime = System.currentTimeMillis();
- _attributes.put(LAST_UPDATED_TIME, currentTime);
- _attributes.put(CREATED_TIME, currentTime);
- _lastUpdatedTime = currentTime;
- _createdTime = currentTime;
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+ if (currentUser != null)
+ {
+ String currentUserName = currentUser.getName();
+ _attributes.put(LAST_UPDATED_BY, currentUserName);
+ _attributes.put(CREATED_BY, currentUserName);
+ _lastUpdatedBy = currentUserName;
+ _createdBy = currentUserName;
+ }
+ final long currentTime = System.currentTimeMillis();
+ _attributes.put(LAST_UPDATED_TIME, currentTime);
+ _attributes.put(CREATED_TIME, currentTime);
+ _lastUpdatedTime = currentTime;
+ _createdTime = currentTime;
+
+ CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
+ try
+ {
+ doResolution(true, createExceptionHandler);
+ doValidation(true, createExceptionHandler);
+ validateOnCreate();
+ registerWithParents();
+ }
+ catch (RuntimeException e)
+ {
+ createExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
- CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
- try
- {
- doResolution(true, createExceptionHandler);
- doValidation(true, createExceptionHandler);
- validateOnCreate();
- registerWithParents();
- }
- catch(RuntimeException e)
- {
- createExceptionHandler.handleException(e, this);
- }
+ final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
+ new CreateExceptionHandler(true);
+
+ try
+ {
+ doCreation(true, unregisteringExceptionHandler);
+ doOpening(true, unregisteringExceptionHandler);
+ return doAttainState(unregisteringExceptionHandler);
+ }
+ catch (RuntimeException e)
+ {
+ unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
+ }
+ return Futures.immediateFuture(null);
- AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true);
- try
- {
- doCreation(true, unregisteringExceptionHandler);
- doOpening(true, unregisteringExceptionHandler);
- doAttainState(unregisteringExceptionHandler);
- }
- catch(RuntimeException e)
- {
- unregisteringExceptionHandler.handleException(e, this);
}
- }
+ });
+
}
protected void validateOnCreate()
@@ -610,8 +797,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
}
- private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+ private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ attainState().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ catch(RuntimeException e)
+ {
+ try
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ catch(Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }
+ }
+ });
+ counter.incrementCount();
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
@@ -619,22 +838,43 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (child instanceof AbstractConfiguredObject)
{
- AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
+ final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
if (configuredObject._dynamicState.get() == DynamicState.OPENED)
{
- try
- {
- configuredObject.doAttainState(exceptionHandler);
- }
- catch (RuntimeException e)
- {
- exceptionHandler.handleException(e, configuredObject);
- }
+ counter.incrementCount();
+ Futures.addCallback(configuredObject.doAttainState(exceptionHandler),
+ new FutureCallback()
+ {
+ @Override
+ public void onSuccess(final Object result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ try
+ {
+ if (t instanceof RuntimeException)
+ {
+ exceptionHandler.handleException((RuntimeException) t,
+ configuredObject);
+ }
+ }
+ finally
+ {
+ counter.decrementCount();
+ }
+ }
+ },getTaskExecutor().getExecutor());
+
}
}
}
});
- attainState();
+ counter.decrementCount();
+ return returnVal;
}
protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
@@ -890,16 +1130,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private void attainStateIfOpenedOrReopenFailed()
+ private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed()
{
if (_openComplete || getDesiredState() == State.DELETED)
{
- attainState();
+ return attainState();
}
else if (_openFailed)
{
- open();
+ return openAsync();
}
+ return Futures.immediateFuture(null);
}
protected void onOpen()
@@ -907,10 +1148,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- protected void attainState()
+ protected ListenableFuture<Void> attainState()
{
State currentState = getState();
State desiredState = getDesiredState();
+ ListenableFuture<Void> returnVal;
if(currentState != desiredState)
{
Method stateChangingMethod = getStateChangeMethod(currentState, desiredState);
@@ -918,7 +1160,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
try
{
- stateChangingMethod.invoke(this);
+ returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this);
}
catch (IllegalAccessException e)
{
@@ -938,7 +1180,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying);
}
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
+ return returnVal;
}
private Method getStateChangeMethod(final State currentState, final State desiredState)
@@ -1013,44 +1264,72 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- private State setDesiredState(final State desiredState)
+ private ListenableFuture<Void> setDesiredState(final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
-
- return runTask(new Task<State>()
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ final State state = getState();
+ final State currentDesiredState = getDesiredState();
+ if(desiredState == currentDesiredState && desiredState != state)
+ {
+ return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable()
+ {
+ @Override
+ public void run()
{
- @Override
- public State execute()
+ final State currentState = getState();
+ if (currentState != state)
{
+ notifyStateChanged(state, currentState);
+ }
- State state = getState();
- if(desiredState == getDesiredState() && desiredState != state)
- {
- attainStateIfOpenedOrReopenFailed();
- final State currentState = getState();
- if (currentState != state)
- {
- notifyStateChanged(state, currentState);
- }
- return currentState;
- }
- else
- {
- setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE,
- desiredState));
+ }
+ });
+ }
+ else
+ {
+ ConfiguredObject<?> proxyForValidation =
+ createProxyForValidation(Collections.<String, Object>singletonMap(
+ ConfiguredObject.DESIRED_STATE,
+ desiredState));
+ Set<String> desiredStateOnlySet = Collections.unmodifiableSet(
+ Collections.singleton(ConfiguredObject.DESIRED_STATE));
+ authoriseSetAttributes(proxyForValidation, desiredStateOnlySet);
+ validateChange(proxyForValidation, desiredStateOnlySet);
+
+ if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+ {
+ attributeSet(ConfiguredObject.DESIRED_STATE,
+ currentDesiredState,
+ desiredState);
+
+ return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (getState() == desiredState)
+ {
+ notifyStateChanged(state, desiredState);
+ }
+
+ }
+ }
+ );
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ }
+ });
- if (getState() == desiredState)
- {
- notifyStateChanged(state, desiredState);
- return desiredState;
- }
- else
- {
- return getState();
- }
- }
- }
- });
}
@Override
@@ -1429,20 +1708,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void stop()
{
- setDesiredState(State.STOPPED);
+ doSync(setDesiredState(State.STOPPED));
}
public final void delete()
{
- if(getState() == State.UNINITIALIZED)
+ doSync(deleteAsync());
+ }
+
+ protected final <R> R doSync(ListenableFuture<R> async)
+ {
+ try
+ {
+ return async.get();
+ }
+ catch (InterruptedException e)
{
- _desiredState = State.DELETED;
+ throw new ServerScopedRuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else if(cause instanceof Error)
+ {
+ throw (Error) cause;
+ }
+ else if(cause != null)
+ {
+ throw new ServerScopedRuntimeException(cause);
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
}
- setDesiredState(State.DELETED);
+ }
+
+ public final ListenableFuture<Void> deleteAsync()
+ {
+ return setDesiredState(State.DELETED);
+ }
+
+ public final void start()
+ {
+ doSync(startAsync());
+ }
+ public ListenableFuture<Void> startAsync()
+ {
+ return setDesiredState(State.ACTIVE);
}
- public final void start() { setDesiredState(State.ACTIVE); }
protected void deleted()
{
@@ -1527,24 +1848,175 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
_taskExecutor.run(task);
}
+ @Override
+ public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ doSync(setAttributesAsync(attributes));
+ }
+
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ second.run();
+ returnVal.set(null);
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
+
+ public static interface ChainedListenableFuture extends ListenableFuture<Void>
+ {
+ ChainedListenableFuture then(Runnable r);
+ ChainedListenableFuture then(Callable<ListenableFuture<Void>> r);
+ }
+
+ public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture
+ {
+ private final Executor _exector;
+
+ public ChainedSettableFuture(final Executor executor)
+ {
+ _exector = executor;
+ }
+
+ @Override
+ public boolean set(Void value)
+ {
+ return super.set(value);
+ }
+
+ @Override
+ public boolean setException(Throwable throwable)
+ {
+ return super.setException(throwable);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Runnable r)
+ {
+ return doAfter(_exector, this, r);
+ }
+
+ @Override
+ public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r)
+ {
+ return doAfter(_exector, this,r);
+ }
+ }
+
+ protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ return doAfter(getTaskExecutor().getExecutor(), first, second);
+ }
+
+ protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+ {
+ final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+ Futures.addCallback(first, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ final ListenableFuture<Void> future = second.call();
+ Futures.addCallback(future, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ returnVal.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ }
+ catch(Throwable e)
+ {
+ returnVal.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ returnVal.setException(t);
+ }
+ }, executor);
+
+ return returnVal;
+ }
@Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
+ final Map<String,Object> updateAttributes = new HashMap<>(attributes);
+ Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
runTask(new VoidTask()
{
@Override
public void execute()
{
authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
- changeAttributes(attributes);
+ validateChange(createProxyForValidation(attributes), attributes.keySet());
+
+ changeAttributes(updateAttributes);
}
});
+ if(desiredState != null)
+ {
+ State state;
+ if(desiredState instanceof State)
+ {
+ state = (State)desiredState;
+ }
+ else if(desiredState instanceof String)
+ {
+ state = State.valueOf((String)desiredState);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+ }
+ return setDesiredState(state);
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
}
protected void changeAttributes(final Map<String, Object> attributes)
{
- validateChange(createProxyForValidation(attributes), attributes.keySet());
Collection<String> names = getAttributeNames();
for (String name : names)
{
@@ -1938,6 +2410,74 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ private static class CloseResult implements FutureResult
+ {
+ private volatile FutureResult _childFutureResult;
+
+ @Override
+ public boolean isComplete()
+ {
+ return _childFutureResult != null && _childFutureResult.isComplete();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ synchronized (this)
+ {
+ while (_childFutureResult == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion();
+
+ }
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ synchronized (this)
+ {
+ while (_childFutureResult == null && remaining > 0)
+ {
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ remaining = startTime + timeout - System.currentTimeMillis();
+
+ if(remaining <= 0)
+ {
+ throw new TimeoutException("Completion did not occur within given timeout: " + timeout);
+ }
+ }
+ }
+ _childFutureResult.waitForCompletion(remaining);
+
+ }
+
+ public synchronized void setChildFutureResult(final FutureResult childFutureResult)
+ {
+ _childFutureResult = childFutureResult;
+ notifyAll();
+ }
+ }
+
private static class AttributeGettingHandler implements InvocationHandler
{
@@ -2127,7 +2667,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (source.getState() != State.DELETED)
{
- source.delete();
+ // TODO - RG - This isn't right :-(
+ source.deleteAsync();
}
}
finally
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
index 5bf5e337ad..f97d2dfe14 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
@@ -23,6 +23,10 @@ package org.apache.qpid.server.model;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectDependency;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -59,6 +63,26 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf
return instance;
}
+
+ @Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ final SettableFuture<X> returnVal = SettableFuture.create();
+ final X instance = createInstance(attributes, parents);
+ final ListenableFuture<Void> createFuture = instance.createAsync();
+ createFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(instance);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ return returnVal;
+ }
+
protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents);
public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
index b421c5aaf1..c6ac7d4073 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
@@ -31,6 +31,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
@@ -194,11 +197,11 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
final EventLogger eventLogger = _eventLogger;
- EventLogger startupLogger;
+ final EventLogger startupLogger;
if (isStartupLoggedToSystemOut())
{
//Create the composite (logging+SystemOut MessageLogger to be used during startup
@@ -232,17 +235,34 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this);
upgrader.perform();
- Broker broker = getBroker();
+ final Broker broker = getBroker();
broker.setEventLogger(startupLogger);
- broker.open();
-
- if (broker.getState() == State.ACTIVE)
- {
- startupLogger.message(BrokerMessages.READY());
- broker.setEventLogger(eventLogger);
- }
-
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ broker.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ if (broker.getState() == State.ACTIVE)
+ {
+ startupLogger.message(BrokerMessages.READY());
+ broker.setEventLogger(eventLogger);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
index 944ed97ccc..c56698c60c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -45,5 +45,4 @@ public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getMatches();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
new file mode 100644
index 0000000000..5e9d794e14
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.model;
+
+
+public interface CloseFuture
+{
+ public void runWhenComplete(final Runnable closeRunnable);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 2d60879861..d2ab317f0e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -236,6 +238,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObject... otherParents);
void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+ ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+
Class<? extends ConfiguredObject> getCategoryClass();
Class<? extends ConfiguredObject> getTypeClass();
@@ -248,8 +252,12 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObjectRecord asObjectRecord();
void open();
+ ListenableFuture<Void> openAsync();
void close();
+ ListenableFuture<Void> closeAsync();
+
+ ListenableFuture<Void> deleteAsync();
TaskExecutor getTaskExecutor();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
index 7d4023862b..ed7c841344 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
@@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory
<X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+ <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
<X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
index 5026df0e19..82da0fd206 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
@@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
return factory.create(this, attributes, parents);
}
+
+ @Override
+ public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes);
+
+ return factory.createAsync(this, attributes, parents);
+ }
+
+
@Override
public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass,
Map<String, Object> attributes)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
index d0c6fb041e..a93e6a602f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
@@ -40,6 +40,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.plugin.ConfiguredObjectRegistration;
@@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistry
{
if(m.isAnnotationPresent(StateTransition.class))
{
- if(m.getParameterTypes().length == 0)
+ if(ListenableFuture.class.isAssignableFrom(m.getReturnType()))
{
- m.setAccessible(true);
- StateTransition annotation = m.getAnnotation(StateTransition.class);
+ if (m.getParameterTypes().length == 0)
+ {
+ m.setAccessible(true);
+ StateTransition annotation = m.getAnnotation(StateTransition.class);
+
+ for (State state : annotation.currentState())
+ {
+ addStateTransition(state, annotation.desiredState(), m, map);
+ }
- for(State state : annotation.currentState())
+ }
+ else
{
- addStateTransition(state, annotation.desiredState(), m, map);
+ throw new ServerScopedRuntimeException(
+ "A state transition method must have no arguments. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
-
}
else
{
- throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria.");
+ throw new ServerScopedRuntimeException(
+ "A state transition method must return a ListenableFuture. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index b28441438d..1c245363a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -103,7 +103,6 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
//children
Collection<Session> getSessions();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index 7318a58640..999a3594b4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
@ManagedObject
public interface Port<X extends Port<X>> extends ConfiguredObject<X>
{
@@ -76,4 +78,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
void start();
+ ListenableFuture<Void> startAsync();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index cc758ba7c9..c2338c08d8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -147,8 +147,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
void stop();
- void delete();
-
String getRedirectHost(AmqpPort<?> port);
public static interface Transaction
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index be1d6ebf59..036c4d7716 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -35,6 +35,9 @@ import java.util.regex.Pattern;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.common.QpidProperties;
@@ -234,13 +237,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if(_parent.isManagementMode())
{
- _managementModeAuthenticationProvider.open();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _managementModeAuthenticationProvider.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ activateWithoutManagementMode();
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
+ else
+ {
+ activateWithoutManagementMode();
+ return Futures.immediateFuture(null);
+ }
+ }
+ private void activateWithoutManagementMode()
+ {
boolean hasBrokerAnyErroredChildren = false;
for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index e03904789d..8bcbba9ac4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -27,10 +27,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.CloseFuture;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
@@ -51,6 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
private final Action _underlyingConnectionDeleteTask;
private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
private AMQConnectionModel _underlyingConnection;
+ private final AtomicBoolean _closing = new AtomicBoolean();
public ConnectionAdapter(final AMQConnectionModel conn)
{
@@ -156,17 +160,59 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- closeUnderlyingConnection();
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ asyncClose().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
+ }
+
+ @Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ return asyncClose();
+
+ }
+
+ private ListenableFuture<Void> asyncClose()
+ {
+ final SettableFuture<Void> closeFuture = SettableFuture.create();
+
+ _underlyingConnection.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ closeFuture.set(null);
+ }
+ });
+
+ _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ return closeFuture;
}
@Override
protected void onClose()
{
- closeUnderlyingConnection();
}
@Override
@@ -233,23 +279,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
// SessionAdapter installs delete task to cause session model object to delete
}
- private void closeUnderlyingConnection()
+
+ private static class ConnectionCloseFuture implements CloseFuture
{
- if (_underlyingClosed.compareAndSet(false, true))
+ private boolean _closed;
+
+ public synchronized void connectionClosed()
{
- _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
- try
+ _closed = true;
+ notifyAll();
+
+ }
+
+ @Override
+ public void runWhenComplete(final Runnable closeRunnable)
+ {
+ if (_closed )
{
- _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ closeRunnable.run();
}
- catch (Exception e)
+ else
{
- LOGGER.warn("Exception closing connection "
- + _underlyingConnection.getConnectionId()
- + " from "
- + _underlyingConnection.getRemoteAddressString(), e);
- }
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (ConnectionCloseFuture.this)
+ {
+ while (!_closed)
+ {
+ try
+ {
+ ConnectionCloseFuture.this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ closeRunnable.run();
+ }
+ }
+ });
+
+ t.setDaemon(true);
+ t.start();
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index fda8a6f2e9..1a119be32d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -31,6 +31,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -145,7 +148,8 @@ public class FileBasedGroupProviderImpl
GroupAdapter groupAdapter = new GroupAdapter(attrMap);
principals.add(groupAdapter);
groupAdapter.registerWithParents();
- groupAdapter.open();
+ // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread
+ groupAdapter.openAsync();
}
}
@@ -261,7 +265,7 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_groupDatabase != null)
{
@@ -278,29 +282,48 @@ public class FileBasedGroupProviderImpl
throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath()));
}
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- File file = new File(getPath());
- if (file.exists())
- {
- if (!file.delete())
- {
- throw new IllegalConfigurationException("Cannot delete group file");
- }
- }
-
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ File file = new File(getPath());
+ if (file.exists())
+ {
+ if (!file.delete())
+ {
+ throw new IllegalConfigurationException("Cannot delete group file");
+ }
+ }
+
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
public Set<Principal> getGroupPrincipalsForUser(String username)
@@ -352,9 +375,10 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
@@ -371,7 +395,8 @@ public class FileBasedGroupProviderImpl
attrMap.put(GroupMember.NAME, principal.getName());
GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap);
groupMemberAdapter.registerWithParents();
- groupMemberAdapter.open();
+ // todo - this will be safe, but the synchronous open should not be called from the management thread
+ groupMemberAdapter.openAsync();
members.add(groupMemberAdapter);
}
_groupPrincipal = new GroupPrincipal(getName());
@@ -432,11 +457,12 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_groupDatabase.removeGroup(getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
@@ -494,17 +520,19 @@ public class FileBasedGroupProviderImpl
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
index 2b77b0d2a9..500df8cb87 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
@@ -37,16 +37,17 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.util.BaseAction;
-import org.apache.qpid.server.util.FileHelper;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.type.TypeReference;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -55,6 +56,8 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
public class FileSystemPreferencesProviderImpl
@@ -128,7 +131,7 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_store != null)
{
@@ -138,6 +141,7 @@ public class FileSystemPreferencesProviderImpl
{
throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() );
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -171,33 +175,52 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED)
- private void doQuiesce()
+ private ListenableFuture<Void> doQuiesce()
{
if(_store != null)
{
_store.close();
}
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ if(_store != null)
+ {
+ _store.close();
+ _store.delete();
+ deleted();
+ _authenticationProvider.setPreferencesProvider(null);
+
+ }
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
- if(_store != null)
- {
- _store.close();
- _store.delete();
- deleted();
- _authenticationProvider.setPreferencesProvider(null);
+ return returnVal;
- }
- setState(State.DELETED);
}
@StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE )
- private void restart()
+ private ListenableFuture<Void> restart()
{
if (_store == null)
{
@@ -206,6 +229,7 @@ public class FileSystemPreferencesProviderImpl
_store.open();
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 7c9b439e93..cb412e8d41 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -26,6 +26,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -169,10 +172,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index 791bbe4dd3..0e6f18a70a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -27,6 +27,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -226,14 +229,24 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ setState(State.DELETED);
+ returnVal.set(null);
+
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
@StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
try
{
@@ -244,12 +257,14 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
setState(State.ERRORED);
throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e);
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index 43cb5f0c62..350f137a04 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
private final Broker<?> _broker;
private AcceptingTransport _transport;
+ private final AtomicBoolean _closing = new AtomicBoolean();
+ private final SettableFuture _noConnectionsRemain = SettableFuture.create();
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
}
@Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ if (_connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
+ return _noConnectionsRemain;
+ }
+
+ @Override
protected void onClose()
{
if (_transport != null)
@@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
{
_broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort()));
}
+
+
_transport.close();
}
}
@@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
_connectionCountWarningGiven.compareAndSet(true,false);
}
+ if (_closing.get() && _connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
return openConnections;
}
@@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
@Override
public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
{
- return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections;
+ return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections );
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
index 870621f292..5c3000db4a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model.port;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -108,6 +110,14 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)