diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java | 239 |
1 files changed, 205 insertions, 34 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 3680e476c7..220beb20f8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -37,10 +37,15 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -61,6 +66,7 @@ import org.apache.qpid.server.message.MessageNode; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.adapter.ConnectionAdapter; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ConnectionValidator; @@ -384,27 +390,65 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return isStoreEmptyHandler.isEmpty(); } - protected void createDefaultExchanges() + protected ListenableFuture<Void> createDefaultExchanges() { - Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>() + return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>() { + private static final int TOTAL_STANDARD_EXCHANGES = 4; + private final AtomicInteger _createdExchangeCount = new AtomicInteger(); + private SettableFuture<Void> _future = SettableFuture.create(); + @Override - public Void run() + public ListenableFuture<Void> run() { addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - return null; + return _future; + } + + private void standardExchangeCreated() + { + if(_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES) + { + _future.set(null); + } } - void addStandardExchange(String name, String type) + ListenableFuture<Void> addStandardExchange(String name, String type) { + Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.NAME, name); attributes.put(Exchange.TYPE, type); attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName())); - childAdded(addExchange(attributes)); + final ListenableFuture<ExchangeImpl> future = addExchangeAsync(attributes); + final SettableFuture<Void> returnVal = SettableFuture.create(); + Futures.addCallback(future, new FutureCallback<ExchangeImpl>() + { + @Override + public void onSuccess(final ExchangeImpl result) + { + try + { + childAdded(result); + } + finally + { + standardExchangeCreated(); + } + + } + + @Override + public void onFailure(final Throwable t) + { + standardExchangeCreated(); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } }); } @@ -747,6 +791,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } + private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes) + throws ExchangeExistsException, ReservedExchangeNameException, + NoFactoryForTypeException + { + try + { + ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this); + return result; + } + catch (DuplicateNameException e) + { + throw new ExchangeExistsException(getExchange(e.getName())); + } + + } + + @Override public void removeExchange(ExchangeImpl exchange, boolean force) throws ExchangeIsAlternateException, RequiredExchangeException @@ -777,9 +838,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - protected void beforeClose() + protected ListenableFuture<Void> beforeClose() { setState(State.UNAVAILABLE); + + return super.beforeClose(); } @Override @@ -1277,37 +1340,76 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - closeChildren(); - shutdownHouseKeeping(); - closeMessageStore(); - setState(State.STOPPED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeChildren().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + shutdownHouseKeeping(); + closeMessageStore(); + setState(State.STOPPED); + + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { if(_deleted.compareAndSet(false,true)) { + final SettableFuture<Void> returnVal = SettableFuture.create(); String hostName = getName(); - close(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + MessageStore ms = getMessageStore(); + if (ms != null) + { + try + { + ms.onDelete(AbstractVirtualHost.this); + } + catch (Exception e) + { + _logger.warn("Exception occurred on message store deletion", e); + } + } + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - MessageStore ms = getMessageStore(); - if (ms != null) - { - try - { - ms.onDelete(this); - } - catch (Exception e) - { - _logger.warn("Exception occurred on message store deletion", e); - } - } - deleted(); - setState(State.DELETED); + return returnVal; + } + else + { + return Futures.immediateFuture(null); } } @@ -1496,7 +1598,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE ) - private void onActivate() + private ListenableFuture<Void> onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory()); @@ -1516,9 +1618,28 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if (isStoreEmpty()) { - createDefaultExchanges(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + createDefaultExchanges().addListener(new Runnable() + { + @Override + public void run() + { + postCreateDefaultExchangeTasks(); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + return returnVal; } + else + { + postCreateDefaultExchangeTasks(); + return Futures.immediateFuture(null); + } + } + + private void postCreateDefaultExchangeTasks() + { if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); @@ -1553,9 +1674,32 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker); } } + private static class ChildCounter + { + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - private void onRestart() + private ListenableFuture<Void> onRestart() { resetStatistics(); @@ -1586,6 +1730,25 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte new GenericRecoverer(this).recover(records); + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + onActivate().addListener( + new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor() + ); + } + }); + counter.incrementCount(); Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() { @Override @@ -1596,14 +1759,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public void performAction(final ConfiguredObject<?> object) { - object.open(); + counter.incrementCount(); + object.openAsync().addListener(new Runnable() + { + @Override + public void run() + { + counter.decrementCount(); + } + }, getTaskExecutor().getExecutor()); } }); return null; } }); - - onActivate(); + counter.decrementCount(); + return returnVal; } private class FileSystemSpaceChecker extends HouseKeepingTask |