diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode')
3 files changed, 111 insertions, 30 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index 03c30a9cd4..fd73963b68 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -29,6 +29,8 @@ import java.util.Map; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -68,7 +70,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -107,15 +109,21 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard if (host != null) { final VirtualHost<?,?,?> recoveredHost = host; - Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - recoveredHost.open(); - return null; - } - }); + final ListenableFuture<Void> openFuture = Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> run() + { + return recoveredHost.openAsync(); + + } + }); + return openFuture; + } + else + { + return Futures.immediateFuture(null); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index a343b71501..8e08554358 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -37,6 +37,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -119,16 +123,47 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { + final SettableFuture<Void> returnVal = SettableFuture.create(); + try { - activate(); - setState(State.ACTIVE); + Futures.addCallback(activate(), + new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + setState(State.ACTIVE); + } + finally + { + returnVal.set(null); + } + + } + + @Override + public void onFailure(final Throwable t) + { + + setState(State.ERRORED); + returnVal.set(null); + if (_broker.isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", t); + } + } + }, getTaskExecutor().getExecutor() + ); } catch(RuntimeException e) { setState(State.ERRORED); + returnVal.set(null); if (_broker.isManagementMode()) { LOGGER.warn("Failed to make " + this + " active.", e); @@ -138,6 +173,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< throw e; } } + return returnVal; } @Override @@ -180,39 +216,73 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { + final SettableFuture<Void> returnVal = SettableFuture.create(); setState(State.DELETED); deleteVirtualHostIfExists(); - close(); - deleted(); - DurableConfigurationStore configurationStore = getConfigurationStore(); - if (configurationStore != null) + final ListenableFuture<Void> closeFuture = closeAsync(); + closeFuture.addListener(new Runnable() { - configurationStore.onDelete(this); - } + @Override + public void run() + { + try + { + deleted(); + DurableConfigurationStore configurationStore = getConfigurationStore(); + if (configurationStore != null) + { + configurationStore.onDelete(AbstractVirtualHostNode.this); + } + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + + return returnVal; + } - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { VirtualHost<?, ?, ?> virtualHost = getVirtualHost(); if (virtualHost != null) { - virtualHost.delete(); + return virtualHost.deleteAsync(); + } + else + { + return Futures.immediateFuture(null); } } @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - stopAndSetStateTo(State.STOPPED); + return stopAndSetStateTo(State.STOPPED); } - protected void stopAndSetStateTo(State stoppedState) + protected ListenableFuture<Void> stopAndSetStateTo(final State stoppedState) { - closeChildren(); - closeConfigurationStoreSafely(); - setState(stoppedState); + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> childCloseFuture = closeChildren(); + childCloseFuture.addListener(new Runnable() + { + @Override + public void run() + { + closeConfigurationStoreSafely(); + setState(stoppedState); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } @Override @@ -270,7 +340,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< protected abstract DurableConfigurationStore createConfigurationStore(); - protected abstract void activate(); + protected abstract ListenableFuture<Void> activate(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java index c94d113514..8a160f83d7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +65,7 @@ public class RedirectingVirtualHostNodeImpl } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { try { @@ -83,6 +85,7 @@ public class RedirectingVirtualHostNodeImpl throw e; } } + return Futures.immediateFuture(null); } @Override |