summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java239
1 files changed, 205 insertions, 34 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 3680e476c7..220beb20f8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -37,10 +37,15 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -61,6 +66,7 @@ import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
@@ -384,27 +390,65 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return isStoreEmptyHandler.isEmpty();
}
- protected void createDefaultExchanges()
+ protected ListenableFuture<Void> createDefaultExchanges()
{
- Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>()
+ return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>()
{
+ private static final int TOTAL_STANDARD_EXCHANGES = 4;
+ private final AtomicInteger _createdExchangeCount = new AtomicInteger();
+ private SettableFuture<Void> _future = SettableFuture.create();
+
@Override
- public Void run()
+ public ListenableFuture<Void> run()
{
addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
- return null;
+ return _future;
+ }
+
+ private void standardExchangeCreated()
+ {
+ if(_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES)
+ {
+ _future.set(null);
+ }
}
- void addStandardExchange(String name, String type)
+ ListenableFuture<Void> addStandardExchange(String name, String type)
{
+
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.NAME, name);
attributes.put(Exchange.TYPE, type);
attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName()));
- childAdded(addExchange(attributes));
+ final ListenableFuture<ExchangeImpl> future = addExchangeAsync(attributes);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ Futures.addCallback(future, new FutureCallback<ExchangeImpl>()
+ {
+ @Override
+ public void onSuccess(final ExchangeImpl result)
+ {
+ try
+ {
+ childAdded(result);
+ }
+ finally
+ {
+ standardExchangeCreated();
+ }
+
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ standardExchangeCreated();
+ }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
}
});
}
@@ -747,6 +791,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
+ private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes)
+ throws ExchangeExistsException, ReservedExchangeNameException,
+ NoFactoryForTypeException
+ {
+ try
+ {
+ ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this);
+ return result;
+ }
+ catch (DuplicateNameException e)
+ {
+ throw new ExchangeExistsException(getExchange(e.getName()));
+ }
+
+ }
+
+
@Override
public void removeExchange(ExchangeImpl exchange, boolean force)
throws ExchangeIsAlternateException, RequiredExchangeException
@@ -777,9 +838,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
setState(State.UNAVAILABLE);
+
+ return super.beforeClose();
}
@Override
@@ -1277,37 +1340,76 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- closeChildren();
- shutdownHouseKeeping();
- closeMessageStore();
- setState(State.STOPPED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeChildren().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ shutdownHouseKeeping();
+ closeMessageStore();
+ setState(State.STOPPED);
+
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
if(_deleted.compareAndSet(false,true))
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
String hostName = getName();
- close();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MessageStore ms = getMessageStore();
+ if (ms != null)
+ {
+ try
+ {
+ ms.onDelete(AbstractVirtualHost.this);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occurred on message store deletion", e);
+ }
+ }
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
- MessageStore ms = getMessageStore();
- if (ms != null)
- {
- try
- {
- ms.onDelete(this);
- }
- catch (Exception e)
- {
- _logger.warn("Exception occurred on message store deletion", e);
- }
- }
- deleted();
- setState(State.DELETED);
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -1496,7 +1598,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE )
- private void onActivate()
+ private ListenableFuture<Void> onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory());
@@ -1516,9 +1618,28 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if (isStoreEmpty())
{
- createDefaultExchanges();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ createDefaultExchanges().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ postCreateDefaultExchangeTasks();
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
+ else
+ {
+ postCreateDefaultExchangeTasks();
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ private void postCreateDefaultExchangeTasks()
+ {
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
_messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
@@ -1553,9 +1674,32 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker);
}
}
+ private static class ChildCounter
+ {
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
@StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- private void onRestart()
+ private ListenableFuture<Void> onRestart()
{
resetStatistics();
@@ -1586,6 +1730,25 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
new GenericRecoverer(this).recover(records);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onActivate().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ }
+ });
+ counter.incrementCount();
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
{
@Override
@@ -1596,14 +1759,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public void performAction(final ConfiguredObject<?> object)
{
- object.open();
+ counter.incrementCount();
+ object.openAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ counter.decrementCount();
+ }
+ }, getTaskExecutor().getExecutor());
}
});
return null;
}
});
-
- onActivate();
+ counter.decrementCount();
+ return returnVal;
}
private class FileSystemSpaceChecker extends HouseKeepingTask