summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-09 17:12:14 +0000
committerKeith Wall <kwall@apache.org>2015-03-09 17:12:14 +0000
commit98faeab2840203c8e4eb4526afe0fd20a596aa28 (patch)
tree665f6493dcca389d39b0a5496ad4a0eaab160ef8
parent10b21b20fbd892d19ae64084165ec8942f864eac (diff)
downloadqpid-python-98faeab2840203c8e4eb4526afe0fd20a596aa28.tar.gz
Add sync/async varients to most ACO methods
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665306 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java9
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java156
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java40
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java463
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java34
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java70
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java56
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java74
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java61
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java23
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java59
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java237
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java28
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java108
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java28
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java13
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java6
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java35
-rw-r--r--qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java5
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java3
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java9
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java5
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java10
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java1
59 files changed, 1494 insertions, 406 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index dfbdce4399..926e9a956f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -24,11 +24,12 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb;
import java.security.AccessControlException;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.je.rep.MasterStateException;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
@@ -150,7 +151,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
}
@StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
String nodeName = getName();
@@ -170,6 +171,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
{
throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
}
+
+ return Futures.immediateFuture(null);
}
protected void afterSetRole()
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 2000897e87..6a4e048e5c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -42,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LogWriteException;
import com.sleepycat.je.rep.NodeState;
@@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
if (LOGGER.isDebugEnabled())
{
@@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress));
shutdownOnIntruder(nodeAddress);
+
throw new IllegalStateException("Intruder node detected: " + nodeAddress);
}
}
@@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer());
environmentFacade.setPermittedNodes(_permittedNodes);
}
+
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- try
- {
- super.doStop();
- }
- finally
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ ListenableFuture<Void> superFuture = super.doStop();
+ Futures.addCallback(superFuture, new FutureCallback<Void>()
{
- closeEnvironment();
+ @Override
+ public void onSuccess(final Void result)
+ {
+ doFinally();
+ }
- // closing the environment does not cause a state change. Adjust the role
- // so that our observers will see DETACHED rather than our previous role in the group.
- _lastRole.set(NodeRole.DETACHED);
- attributeSet(ROLE, _role, NodeRole.DETACHED);
- }
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ doFinally();
+ }
+
+ private void doFinally()
+ {
+ try
+ {
+ closeEnvironment();
+
+ // closing the environment does not cause a state change. Adjust the role
+ // so that our observers will see DETACHED rather than our previous role in the group.
+ _lastRole.set(NodeRole.DETACHED);
+ attributeSet(ROLE, _role, NodeRole.DETACHED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+ });
+ return returnVal;
}
private void closeEnvironment()
@@ -397,43 +427,60 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
- // get helpers before close. on close all children are closed and not available anymore
- Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
- super.doDelete();
-
- if (getConfigurationStore() != null)
- {
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
- }
+ final SettableFuture<Void> returnVal = SettableFuture.create();
- if (getState() == State.DELETED && !helpers.isEmpty())
+ // get helpers before close. on close all children are closed and not available anymore
+ final Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
+ final ListenableFuture<Void> superFuture = super.doDelete();
+ superFuture.addListener(new Runnable()
{
- try
+ @Override
+ public void run()
{
- new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
- }
- catch(DatabaseException e)
- {
- LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
- + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ try
+ {
+ if (getConfigurationStore() != null)
+ {
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
+ }
+
+ if (getState() == State.DELETED && !helpers.isEmpty())
+ {
+ try
+ {
+ new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
+ }
+ catch(DatabaseException e)
+ {
+ LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
+ + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ }
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
}
- }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
}
@Override
- protected void deleteVirtualHostIfExists()
+ protected ListenableFuture<Void> deleteVirtualHostIfExists()
{
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade();
if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster()
&& replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1)
{
- super.deleteVirtualHostIfExists();
+ return super.deleteVirtualHostIfExists();
}
else
{
- closeVirtualHostIfExist();
+ return closeVirtualHostIfExist();
}
}
@@ -553,7 +600,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
getConfigurationStore().upgradeStoreStructure();
@@ -640,7 +687,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
Map<String, Object> hostAttributes = new HashMap<>();
hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
@@ -654,13 +701,32 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- protected void closeVirtualHostIfExist()
+ protected ListenableFuture<Void> closeVirtualHostIfExist()
{
- VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ final VirtualHost<?,?,?> virtualHost = getVirtualHost();
if (virtualHost!= null)
{
- virtualHost.close();
- childRemoved(virtualHost);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ virtualHost.closeAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ childRemoved(virtualHost);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -687,15 +753,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
onReplica();
break;
case DETACHED:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
case UNKNOWN:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
default:
LOGGER.error("Unexpected state change: " + state);
}
}
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
finally
{
NodeRole newRole = NodeRole.fromJeState(state);
@@ -1137,7 +1207,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
try
{
- close();
+ closeAsync();
}
finally
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index 8c389e6d22..bc5d30a0f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -110,8 +110,8 @@ public class Broker implements BrokerShutdownProvider
{
if(_systemConfig != null)
{
- ListenableFuture<Void> closeResult = _systemConfig.close();
- closeResult.get(5000l, TimeUnit.MILLISECONDS);
+ ListenableFuture<Void> closeResult = _systemConfig.closeAsync();
+ closeResult.get(30000l, TimeUnit.MILLISECONDS);
}
_taskExecutor.stop();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index 6012e2e8db..0463bb64a3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -28,6 +28,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.exchange.AbstractExchange;
@@ -195,7 +198,7 @@ public class BindingImpl
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
if(_deleted.compareAndSet(false,true))
{
@@ -208,12 +211,14 @@ public class BindingImpl
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index fecb4de7f5..0f59494850 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -49,6 +49,7 @@ public class TaskExecutorImpl implements TaskExecutor
private volatile Thread _taskThread;
private final AtomicBoolean _running = new AtomicBoolean();
private volatile ExecutorService _executor;
+ private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor();
@Override
@@ -68,7 +69,7 @@ public class TaskExecutorImpl implements TaskExecutor
@Override
public Thread newThread(Runnable r)
{
- _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME);
+ _taskThread = new TaskThread(r, TASK_EXECUTION_THREAD_NAME, TaskExecutorImpl.this);
return _taskThread;
}
});
@@ -281,7 +282,7 @@ public class TaskExecutorImpl implements TaskExecutor
@Override
public Executor getExecutor()
{
- return _executor;
+ return _wrappedExecutor;
}
public boolean isTaskExecutorThread()
@@ -380,4 +381,41 @@ public class TaskExecutorImpl implements TaskExecutor
return get();
}
}
+
+ private class ImmediateIfSameThreadExecutor implements Executor
+ {
+
+ @Override
+ public void execute(final Runnable command)
+ {
+ if(isTaskExecutorThread()
+ || (_executor == null && (Thread.currentThread() instanceof TaskThread
+ && ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this)))
+ {
+ command.run();
+ }
+ else
+ {
+ _executor.execute(command);
+ }
+
+ }
+ }
+
+ private static class TaskThread extends Thread
+ {
+
+ private final TaskExecutorImpl _taskExecutor;
+
+ public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor)
+ {
+ super(r, name);
+ _taskExecutor = taskExecutor;
+ }
+
+ public TaskExecutorImpl getTaskExecutor()
+ {
+ return _taskExecutor;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index 83784d4b25..e17eca8614 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -67,7 +67,7 @@ public interface ConsumerImpl
boolean seesRequeues();
- ListenableFuture<Void> close();
+ void close();
boolean trySendLock();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index cf23e3dd91..4e7cd4a151 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -37,10 +37,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -602,9 +605,18 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
- public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments)
{
- return makeBinding(null, bindingKey, queue, arguments, false);
+ return getTaskExecutor().run(new Task<Boolean>()
+ {
+
+ @Override
+ public Boolean execute()
+ {
+ return makeBinding(null, bindingKey, queue, arguments, false);
+ }
+ });
+
}
@Override
@@ -643,7 +655,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
doRemoveBinding(b);
queue.removeBinding(b);
- b.delete();
+ // TODO - RG - Fix bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
}
@@ -695,7 +715,8 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
attributes.put(Binding.ARGUMENTS, arguments);
BindingImpl b = new BindingImpl(attributes, queue, this);
- b.create(); // Must be called before addBinding as it resolves automated attributes.
+ // TODO - RG - Fix Bindings
+ b.createAsync(); // Must be called before addBinding as it resolves automated attributes.
addBinding(b);
return true;
@@ -732,22 +753,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
try
{
@@ -757,8 +780,9 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
catch (ExchangeIsAlternateException | RequiredExchangeException e)
{
- return;
+
}
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
index 3e377ebaa6..be98665df8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
@@ -107,8 +107,4 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex
void bindingRemoved(ExchangeImpl exchange, BindingImpl binding);
}
- public void addBindingListener(BindingListener listener);
-
- public void removeBindingListener(BindingListener listener);
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 2269999e1d..57eb16c0be 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -43,12 +43,14 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -169,7 +171,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this);
- @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" )
+ @ManagedAttributeField
private State _desiredState;
private boolean _openComplete;
private boolean _openFailed;
@@ -446,24 +448,58 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public final void open()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(openAsync());
+ }
+
+
+ public final ListenableFuture<Void> openAsync()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _taskExecutor.run(new VoidTask()
{
- _openFailed = false;
- OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
- try
- {
- doResolution(true, exceptionHandler);
- doValidation(true, exceptionHandler);
- doOpening(true, exceptionHandler);
- doAttainState(exceptionHandler);
- }
- catch(RuntimeException e)
+
+ @Override
+ public void execute()
{
- exceptionHandler.handleException(e, this);
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ _openFailed = false;
+ OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+ try
+ {
+ doResolution(true, exceptionHandler);
+ doValidation(true, exceptionHandler);
+ doOpening(true, exceptionHandler);
+ doAttainState(exceptionHandler).addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, MoreExecutors.sameThreadExecutor()
+ );
+ }
+ catch (RuntimeException e)
+ {
+ exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ }
+ else
+ {
+ returnVal.set(null);
+ }
}
- }
+ });
+ return returnVal;
+
}
+
+
public void registerWithParents()
{
for(ConfiguredObject<?> parent : _parents.values())
@@ -475,7 +511,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private static class ChildCounter
+ private class ChildCounter
{
private final AtomicInteger _count = new AtomicInteger();
private final Runnable _task;
@@ -501,8 +537,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
protected final ListenableFuture<Void> closeChildren()
{
- LOGGER.debug("KWDEBUG closing children");
-
final SettableFuture<Void> returnVal = SettableFuture.create();
final ChildCounter counter = new ChildCounter(new Runnable()
{
@@ -510,6 +544,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public void run()
{
returnVal.set(null);
+ LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() );
+
}
});
counter.incrementCount();
@@ -521,7 +557,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public void performAction(final ConfiguredObject<?> child)
{
counter.incrementCount();
- ListenableFuture<Void> close = child.close();
+ ListenableFuture<Void> close = child.closeAsync();
close.addListener(new Runnable()
{
@Override
@@ -554,8 +590,15 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
@Override
- public final ListenableFuture<Void> close()
+ public void close()
+ {
+ doSync(closeAsync());
+ }
+
+ @Override
+ public final ListenableFuture<Void> closeAsync()
{
+ LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName());
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
final SettableFuture<Void> returnVal = SettableFuture.create();
@@ -577,6 +620,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
onClose();
unregister(false);
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
returnVal.set(null);
}
}, getTaskExecutor().getExecutor());
@@ -591,8 +635,13 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public void run()
{
+
onClose();
unregister(false);
+ LOGGER.debug("Closed "
+ + AbstractConfiguredObject.this.getClass().getSimpleName()
+ + " : "
+ + getName());
returnVal.set(null);
}
}, getTaskExecutor().getExecutor());
@@ -604,6 +653,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
else
{
+ LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName());
+
return Futures.immediateFuture(null);
}
}
@@ -619,48 +670,88 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void create()
{
- if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ doSync(createAsync());
+ }
+
+ public final ListenableFuture<Void> createAsync()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _taskExecutor.run(new VoidTask()
{
- final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
- if(currentUser != null)
- {
- String currentUserName = currentUser.getName();
- _attributes.put(LAST_UPDATED_BY, currentUserName);
- _attributes.put(CREATED_BY, currentUserName);
- _lastUpdatedBy = currentUserName;
- _createdBy = currentUserName;
- }
- final long currentTime = System.currentTimeMillis();
- _attributes.put(LAST_UPDATED_TIME, currentTime);
- _attributes.put(CREATED_TIME, currentTime);
- _lastUpdatedTime = currentTime;
- _createdTime = currentTime;
- CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
- try
- {
- doResolution(true, createExceptionHandler);
- doValidation(true, createExceptionHandler);
- validateOnCreate();
- registerWithParents();
- }
- catch(RuntimeException e)
+ @Override
+ public void execute()
{
- createExceptionHandler.handleException(e, this);
- }
- AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true);
- try
- {
- doCreation(true, unregisteringExceptionHandler);
- doOpening(true, unregisteringExceptionHandler);
- doAttainState(unregisteringExceptionHandler);
- }
- catch(RuntimeException e)
- {
- unregisteringExceptionHandler.handleException(e, this);
+ if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+ {
+ final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+ if (currentUser != null)
+ {
+ String currentUserName = currentUser.getName();
+ _attributes.put(LAST_UPDATED_BY, currentUserName);
+ _attributes.put(CREATED_BY, currentUserName);
+ _lastUpdatedBy = currentUserName;
+ _createdBy = currentUserName;
+ }
+ final long currentTime = System.currentTimeMillis();
+ _attributes.put(LAST_UPDATED_TIME, currentTime);
+ _attributes.put(CREATED_TIME, currentTime);
+ _lastUpdatedTime = currentTime;
+ _createdTime = currentTime;
+
+ CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
+ try
+ {
+ doResolution(true, createExceptionHandler);
+ doValidation(true, createExceptionHandler);
+ validateOnCreate();
+ registerWithParents();
+ }
+ catch (RuntimeException e)
+ {
+ createExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ }
+
+ final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
+ new CreateExceptionHandler(true);
+ try
+ {
+ doCreation(true, unregisteringExceptionHandler);
+ doOpening(true, unregisteringExceptionHandler);
+ Futures.addCallback(doAttainState(unregisteringExceptionHandler),
+ new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ returnVal.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ if (t instanceof RuntimeException)
+ {
+ unregisteringExceptionHandler.handleException((RuntimeException) t,
+ AbstractConfiguredObject.this);
+ }
+ returnVal.set(null);
+ }
+ },
+ getTaskExecutor().getExecutor());
+ }
+ catch (RuntimeException e)
+ {
+ unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+ returnVal.set(null);
+ }
+ }
}
- }
+ });
+
+ return returnVal;
}
protected void validateOnCreate()
@@ -710,8 +801,33 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
}
- private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+ private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ attainState().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ catch(RuntimeException e)
+ {
+ returnVal.set(null);
+ throw e;
+ }
+ }
+ });
+ counter.incrementCount();
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
@@ -719,22 +835,36 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (child instanceof AbstractConfiguredObject)
{
- AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
+ final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
if (configuredObject._dynamicState.get() == DynamicState.OPENED)
{
- try
- {
- configuredObject.doAttainState(exceptionHandler);
- }
- catch (RuntimeException e)
- {
- exceptionHandler.handleException(e, configuredObject);
- }
+ counter.incrementCount();
+ Futures.addCallback(configuredObject.doAttainState(exceptionHandler),
+ new FutureCallback()
+ {
+ @Override
+ public void onSuccess(final Object result)
+ {
+ counter.decrementCount();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ if(t instanceof RuntimeException)
+ {
+ exceptionHandler.handleException((RuntimeException) t, configuredObject);
+ }
+ counter.decrementCount();
+ }
+ },getTaskExecutor().getExecutor());
+
}
}
}
});
- attainState();
+ counter.decrementCount();
+ return returnVal;
}
protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
@@ -990,16 +1120,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- private void attainStateIfOpenedOrReopenFailed()
+ private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed()
{
if (_openComplete || getDesiredState() == State.DELETED)
{
- attainState();
+ return attainState();
}
else if (_openFailed)
{
- open();
+ return openAsync();
}
+ return Futures.immediateFuture(null);
}
protected void onOpen()
@@ -1007,10 +1138,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- protected void attainState()
+ protected ListenableFuture<Void> attainState()
{
State currentState = getState();
State desiredState = getDesiredState();
+ ListenableFuture<Void> returnVal;
if(currentState != desiredState)
{
Method stateChangingMethod = getStateChangeMethod(currentState, desiredState);
@@ -1018,7 +1150,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
try
{
- stateChangingMethod.invoke(this);
+ returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this);
}
catch (IllegalAccessException e)
{
@@ -1038,7 +1170,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying);
}
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
}
+ else
+ {
+ returnVal = Futures.immediateFuture(null);
+ }
+ return returnVal;
}
private Method getStateChangeMethod(final State currentState, final State desiredState)
@@ -1113,46 +1254,93 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- private State setDesiredState(final State desiredState)
+ private ListenableFuture<Void> setDesiredState(final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
- return runTask(new Task<State>()
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ runTask(new Task<Void>()
{
@Override
- public State execute()
+ public Void execute()
{
- State state = getState();
- if(desiredState == getDesiredState() && desiredState != state)
+ final State state = getState();
+ final State currentDesiredState = getDesiredState();
+ if(desiredState == currentDesiredState && desiredState != state)
{
- attainStateIfOpenedOrReopenFailed();
- final State currentState = getState();
- if (currentState != state)
+ attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
{
- notifyStateChanged(state, currentState);
+ @Override
+ public void run()
+ {
+ try
+ {
+ final State currentState = getState();
+ if (currentState != state)
+ {
+ notifyStateChanged(state, currentState);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
}
- return currentState;
+ ,_taskExecutor.getExecutor());
}
else
{
- authoriseSetDesiredState(desiredState);
+ try
+ {
+ authoriseSetDesiredState(desiredState);
+ validateChange(createProxyForValidation(Collections.<String, Object>singletonMap(
+ ConfiguredObject.DESIRED_STATE,
+ desiredState)), Collections.singleton(ConfiguredObject.DESIRED_STATE));
- setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE,
- desiredState));
+ if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+ {
+ attributeSet(ConfiguredObject.DESIRED_STATE,
+ currentDesiredState,
+ desiredState);
- if (getState() == desiredState)
- {
- notifyStateChanged(state, desiredState);
- return desiredState;
+ attainStateIfOpenedOrReopenFailed().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (getState() == desiredState)
+ {
+ notifyStateChanged(state, desiredState);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+ }, _taskExecutor.getExecutor());
+ }
+ else
+ {
+ returnVal.set(null);
+ }
}
- else
+ catch (RuntimeException | Error e)
{
- return getState();
+ returnVal.set(null);
+ throw e;
}
+
}
+ return null;
}
});
+ return returnVal;
}
@Override
@@ -1531,20 +1719,67 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final void stop()
{
- setDesiredState(State.STOPPED);
+ doSync(setDesiredState(State.STOPPED));
}
public final void delete()
{
- if(getState() == State.UNINITIALIZED)
+ doSync(deleteAsync());
+ }
+
+ private void doSync(ListenableFuture<Void> async)
+ {
+ try
+ {
+ async.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else if(cause instanceof Error)
+ {
+ throw (Error) cause;
+ }
+ else if(cause != null)
+ {
+ throw new ServerScopedRuntimeException(cause);
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
+ }
+ }
+
+ public final ListenableFuture<Void> deleteAsync()
+ {
+ /* if(getState() == State.UNINITIALIZED)
{
_desiredState = State.DELETED;
}
- setDesiredState(State.DELETED);
+ */ return setDesiredState(State.DELETED);
}
- public final void start() { setDesiredState(State.ACTIVE); }
+ public final void start()
+ {
+ doSync(startAsync());
+ }
+
+ public ListenableFuture<Void> startAsync()
+ {
+ return setDesiredState(State.ACTIVE);
+ }
+
protected void deleted()
{
@@ -1629,19 +1864,49 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
_taskExecutor.run(task);
}
+ @Override
+ public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ doSync(setAttributesAsync(attributes));
+ }
@Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
+ final Map<String,Object> updateAttributes = new HashMap<>(attributes);
+ Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
runTask(new VoidTask()
{
@Override
public void execute()
{
authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
- changeAttributes(attributes);
+ validateChange(createProxyForValidation(attributes), attributes.keySet());
+
+ changeAttributes(updateAttributes);
}
});
+ if(desiredState != null)
+ {
+ State state;
+ if(desiredState instanceof State)
+ {
+ state = (State)desiredState;
+ }
+ else if(desiredState instanceof String)
+ {
+ state = State.valueOf((String)desiredState);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+ }
+ return setDesiredState(state);
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
}
protected void authoriseSetAttributes(final ConfiguredObject<?> proxyForValidation,
@@ -1652,7 +1917,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
protected void changeAttributes(final Map<String, Object> attributes)
{
- validateChange(createProxyForValidation(attributes), attributes.keySet());
Collection<String> names = getAttributeNames();
for (String name : names)
{
@@ -2193,7 +2457,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if (source.getState() != State.DELETED)
{
- source.delete();
+ // TODO - RG - This isn't right :-(
+ source.deleteAsync();
}
}
finally
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
index 5bf5e337ad..f97d2dfe14 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
@@ -23,6 +23,10 @@ package org.apache.qpid.server.model;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectDependency;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -59,6 +63,26 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf
return instance;
}
+
+ @Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ final SettableFuture<X> returnVal = SettableFuture.create();
+ final X instance = createInstance(attributes, parents);
+ final ListenableFuture<Void> createFuture = instance.createAsync();
+ createFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(instance);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ return returnVal;
+ }
+
protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents);
public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
index b421c5aaf1..c6ac7d4073 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
@@ -31,6 +31,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
@@ -194,11 +197,11 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
final EventLogger eventLogger = _eventLogger;
- EventLogger startupLogger;
+ final EventLogger startupLogger;
if (isStartupLoggedToSystemOut())
{
//Create the composite (logging+SystemOut MessageLogger to be used during startup
@@ -232,17 +235,34 @@ public abstract class AbstractSystemConfig<X extends SystemConfig<X>>
BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this);
upgrader.perform();
- Broker broker = getBroker();
+ final Broker broker = getBroker();
broker.setEventLogger(startupLogger);
- broker.open();
-
- if (broker.getState() == State.ACTIVE)
- {
- startupLogger.message(BrokerMessages.READY());
- broker.setEventLogger(eventLogger);
- }
-
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ broker.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ if (broker.getState() == State.ACTIVE)
+ {
+ startupLogger.message(BrokerMessages.READY());
+ broker.setEventLogger(eventLogger);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
index 944ed97ccc..c56698c60c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -45,5 +45,4 @@ public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getMatches();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 395cb52fcd..d2ab317f0e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -238,6 +238,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObject... otherParents);
void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+ ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+
Class<? extends ConfiguredObject> getCategoryClass();
Class<? extends ConfiguredObject> getTypeClass();
@@ -250,8 +252,12 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
ConfiguredObjectRecord asObjectRecord();
void open();
+ ListenableFuture<Void> openAsync();
+
+ void close();
+ ListenableFuture<Void> closeAsync();
- ListenableFuture<Void> close();
+ ListenableFuture<Void> deleteAsync();
TaskExecutor getTaskExecutor();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
index 7d4023862b..ed7c841344 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
@@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory
<X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+ <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
<X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
index 5026df0e19..82da0fd206 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
@@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
return factory.create(this, attributes, parents);
}
+
+ @Override
+ public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes);
+
+ return factory.createAsync(this, attributes, parents);
+ }
+
+
@Override
public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass,
Map<String, Object> attributes)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
index d0c6fb041e..a93e6a602f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
@@ -40,6 +40,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.plugin.ConfiguredObjectRegistration;
@@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistry
{
if(m.isAnnotationPresent(StateTransition.class))
{
- if(m.getParameterTypes().length == 0)
+ if(ListenableFuture.class.isAssignableFrom(m.getReturnType()))
{
- m.setAccessible(true);
- StateTransition annotation = m.getAnnotation(StateTransition.class);
+ if (m.getParameterTypes().length == 0)
+ {
+ m.setAccessible(true);
+ StateTransition annotation = m.getAnnotation(StateTransition.class);
+
+ for (State state : annotation.currentState())
+ {
+ addStateTransition(state, annotation.desiredState(), m, map);
+ }
- for(State state : annotation.currentState())
+ }
+ else
{
- addStateTransition(state, annotation.desiredState(), m, map);
+ throw new ServerScopedRuntimeException(
+ "A state transition method must have no arguments. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
-
}
else
{
- throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria.");
+ throw new ServerScopedRuntimeException(
+ "A state transition method must return a ListenableFuture. Method "
+ + m.getName()
+ + " on "
+ + clazz.getName()
+ + " does not meet this criteria.");
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index b28441438d..1c245363a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -103,7 +103,6 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
//children
Collection<Session> getSessions();
- void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index 7318a58640..999a3594b4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
@ManagedObject
public interface Port<X extends Port<X>> extends ConfiguredObject<X>
{
@@ -76,4 +78,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
void start();
+ ListenableFuture<Void> startAsync();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index cc758ba7c9..c2338c08d8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -147,8 +147,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
void stop();
- void delete();
-
String getRedirectHost(AmqpPort<?> port);
public static interface Transaction
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 28eea21093..dfbe8b12ef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -35,6 +35,9 @@ import java.util.regex.Pattern;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.common.QpidProperties;
@@ -235,13 +238,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if(_parent.isManagementMode())
{
- _managementModeAuthenticationProvider.open();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ _managementModeAuthenticationProvider.openAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ activateWithoutManagementMode();
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
+ else
+ {
+ activateWithoutManagementMode();
+ return Futures.immediateFuture(null);
+ }
+ }
+ private void activateWithoutManagementMode()
+ {
boolean hasBrokerAnyErroredChildren = false;
for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index a4dbd7d5e5..8bcbba9ac4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -160,11 +160,28 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- asyncClose();
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ asyncClose().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index 327b7ddfe9..67533f8244 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -32,6 +32,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -147,7 +150,8 @@ public class FileBasedGroupProviderImpl
GroupAdapter groupAdapter = new GroupAdapter(attrMap);
principals.add(groupAdapter);
groupAdapter.registerWithParents();
- groupAdapter.open();
+ // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread
+ groupAdapter.openAsync();
}
}
@@ -265,7 +269,7 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_groupDatabase != null)
{
@@ -282,29 +286,48 @@ public class FileBasedGroupProviderImpl
throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath()));
}
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- File file = new File(getPath());
- if (file.exists())
- {
- if (!file.delete())
- {
- throw new IllegalConfigurationException("Cannot delete group file");
- }
- }
-
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ File file = new File(getPath());
+ if (file.exists())
+ {
+ if (!file.delete())
+ {
+ throw new IllegalConfigurationException("Cannot delete group file");
+ }
+ }
+
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
public Set<Principal> getGroupPrincipalsForUser(String username)
@@ -377,9 +400,10 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
@@ -396,7 +420,8 @@ public class FileBasedGroupProviderImpl
attrMap.put(GroupMember.NAME, principal.getName());
GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap);
groupMemberAdapter.registerWithParents();
- groupMemberAdapter.open();
+ // todo - this will be safe, but the synchronous open should not be called from the management thread
+ groupMemberAdapter.openAsync();
members.add(groupMemberAdapter);
}
_groupPrincipal = new GroupPrincipal(getName());
@@ -459,12 +484,13 @@ public class FileBasedGroupProviderImpl
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName());
_groupDatabase.removeGroup(getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
@@ -522,19 +548,21 @@ public class FileBasedGroupProviderImpl
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName());
_groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
index 7046f2973e..c95b3ab804 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
@@ -37,16 +37,17 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.util.BaseAction;
-import org.apache.qpid.server.util.FileHelper;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.type.TypeReference;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -55,6 +56,8 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
public class FileSystemPreferencesProviderImpl
@@ -128,7 +131,7 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
if (_store != null)
{
@@ -138,6 +141,7 @@ public class FileSystemPreferencesProviderImpl
{
throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() );
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -171,33 +175,52 @@ public class FileSystemPreferencesProviderImpl
}
@StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED)
- private void doQuiesce()
+ private ListenableFuture<Void> doQuiesce()
{
if(_store != null)
{
_store.close();
}
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ if(_store != null)
+ {
+ _store.close();
+ _store.delete();
+ deleted();
+ _authenticationProvider.setPreferencesProvider(null);
+
+ }
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
- if(_store != null)
- {
- _store.close();
- _store.delete();
- deleted();
- _authenticationProvider.setPreferencesProvider(null);
+ return returnVal;
- }
- setState(State.DELETED);
}
@StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE )
- private void restart()
+ private ListenableFuture<Void> restart()
{
if (_store == null)
{
@@ -206,6 +229,7 @@ public class FileSystemPreferencesProviderImpl
_store.open();
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 7c9b439e93..cb412e8d41 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -26,6 +26,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -169,10 +172,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index 21827ffe58..5c53eed509 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -28,6 +28,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -228,14 +231,24 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
}
@StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ setState(State.DELETED);
+ returnVal.set(null);
+
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
@StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
try
{
@@ -246,12 +259,14 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
setState(State.ERRORED);
throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e);
}
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
index 870621f292..5c3000db4a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model.port;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -108,6 +110,14 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
index 0d16b4ffc7..cd0187034e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
@@ -20,19 +20,23 @@
*/
package org.apache.qpid.server.plugin;
+import java.util.Map;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
-import java.util.Map;
-
public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> extends Pluggable
{
Class<? super X> getCategoryClass();
X create(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+ ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
ConfiguredObjectRecord record,
ConfiguredObject<?>... parents);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 0ba48387dd..664c544de4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -43,12 +43,15 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.TaskWithException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -650,16 +653,51 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
- public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- FilterManager filters,
+ public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<ConsumerImpl.Option> optionSet)
+ final EnumSet<ConsumerImpl.Option> optionSet)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
+ try
+ {
+ return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>()
+ {
+ @Override
+ public QueueConsumerImpl execute()
+ throws Exception
+ {
+
+ return addConsumerInternal(target, filters, messageClass, consumerName, optionSet);
+ }
+ });
+ }
+ catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
+ ExistingConsumerPreventsExclusive | RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ // Should never happen
+ throw new ServerScopedRuntimeException(e);
+ }
+
+
+ }
+
+ private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+ FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<ConsumerImpl.Option> optionSet)
+ throws ExistingExclusiveConsumer, ConsumerAccessRefused,
+ ExistingConsumerPreventsExclusive
+ {
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
@@ -771,7 +809,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,
- filters,
+ filters,
messageClass,
optionSet);
@@ -820,7 +858,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
deliverAsync();
return consumer;
-
}
@Override
@@ -832,7 +869,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
- synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
+ void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
{
@@ -843,7 +880,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (removed)
{
- consumer.close();
+ consumer.closeAsync();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
@@ -1802,7 +1839,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
for (BindingImpl b : bindingCopy)
{
- b.delete();
+ // TODO - RG - Need to sort out bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
@@ -1855,7 +1900,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_deleteTaskList.clear();
- close();
+ closeAsync();
deleted();
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -2661,7 +2706,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return allowed;
}
- private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
+ private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
throws ExistingConsumerPreventsExclusive
{
if(desiredPolicy == null)
@@ -2863,24 +2908,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
//=============
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_virtualHost.removeQueue(this);
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 4ffb868537..a5225f3aa4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -191,7 +191,7 @@ class QueueConsumerImpl
if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
{
- close();
+ closeAsync();
}
final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
if(stateListener != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
index 19265ef453..b9ff6505fc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Port;
@@ -49,6 +51,14 @@ public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFa
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
index aa5f55dfb4..5f585f3d88 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
@@ -38,6 +38,9 @@ import java.util.Set;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
@@ -98,7 +101,7 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl>
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -113,12 +116,14 @@ public class FileKeyStoreImpl extends AbstractConfiguredObject<FileKeyStoreImpl>
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
index fb161fef4e..df1cbd0493 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
@@ -38,6 +38,9 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -98,7 +101,7 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -139,12 +142,14 @@ public class FileTrustStoreImpl extends AbstractConfiguredObject<FileTrustStoreI
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
index f6298ab383..c2779415d1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
@@ -57,6 +57,8 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.xml.bind.DatatypeConverter;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -184,7 +186,7 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -202,12 +204,14 @@ public class NonJavaKeyStoreImpl extends AbstractConfiguredObject<NonJavaKeyStor
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
index 993d689fb6..397e226699 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
@@ -45,6 +45,8 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.security.auth.x500.X500Principal;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -172,7 +174,7 @@ public class NonJavaTrustStoreImpl
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -215,12 +217,14 @@ public class NonJavaTrustStoreImpl
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
index 7f98468726..9befcebe5b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
@@ -28,6 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -175,13 +178,14 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED )
- protected void startQuiesced()
+ protected ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
try
{
@@ -199,11 +203,11 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
throw e;
}
}
-
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
String providerName = getName();
@@ -219,15 +223,50 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
}
}
- close();
- if (_preferencesProvider != null)
- {
- _preferencesProvider.delete();
- }
- deleted();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
- setState(State.DELETED);
+ final ListenableFuture<Void> future = closeAsync();
+ future.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (_preferencesProvider != null)
+ {
+ _preferencesProvider.deleteAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ else
+ {
+ try
+ {
+ deleted();
+
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
index 78b2b60fe9..7046fc4885 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
@@ -98,22 +98,15 @@ public abstract class ConfigModelPasswordManagingAuthenticationProvider<X extend
@Override
public void deleteUser(final String user) throws AccountNotFoundException
{
- runTask(new VoidTaskWithException<AccountNotFoundException>()
+ final ManagedUser authUser = getUser(user);
+ if(authUser != null)
{
- @Override
- public void execute() throws AccountNotFoundException
- {
- final ManagedUser authUser = getUser(user);
- if(authUser != null)
- {
- authUser.delete();
- }
- else
- {
- throw new AccountNotFoundException("No such user: '" + user + "'");
- }
- }
- });
+ authUser.delete();
+ }
+ else
+ {
+ throw new AccountNotFoundException("No such user: '" + user + "'");
+ }
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
index c8884e15a8..6f36ec7d11 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
@@ -27,6 +27,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -96,10 +99,11 @@ class ManagedUser extends AbstractConfiguredObject<ManagedUser> implements User<
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_authenticationManager.getUserMap().remove(getName());
deleted();
+ return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index cf165ff4af..a4dbcdc284 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -40,6 +40,9 @@ import javax.security.auth.login.AccountNotFoundException;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
super.onOpen();
_principalDatabase = createDatabase();
initialise();
- List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
- for (Principal user : users)
- {
- PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
- principalAdapter.registerWithParents();
- principalAdapter.open();
- _userMap.put(user, principalAdapter);
- }
}
+
protected abstract PrincipalDatabase createDatabase();
@@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
return _principalDatabase;
}
+ @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
+ public ListenableFuture<Void> activate()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
+ _userMap.clear();
+ if(!users.isEmpty())
+ {
+ for (final Principal user : users)
+ {
+ final PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
+ principalAdapter.registerWithParents();
+ principalAdapter.openAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _userMap.put(user, principalAdapter);
+ if (_userMap.size() == users.size())
+ {
+ setState(State.ACTIVE);
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+
+ }
+
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
- @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- public void doDelete()
+ @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED)
+ public ListenableFuture<Void> doDelete()
{
File file = new File(_path);
if (file.exists() && file.isFile())
@@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
@@ -465,13 +497,14 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
}
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
try
{
@@ -489,7 +522,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
{
LOGGER.warn("Failed to delete user " + _user, e);
}
-
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
index 98607d2490..96d32f4179 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
@@ -22,6 +22,9 @@ package org.apache.qpid.server.security.group;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Group;
@@ -77,16 +80,18 @@ public class GroupImpl extends AbstractConfiguredObject<GroupImpl> implements Gr
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
index ea17db6ce7..a86d380b2b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.security.group;
import java.security.Principal;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Group;
import org.apache.qpid.server.model.GroupMember;
@@ -61,15 +64,17 @@ public class GroupMemberImpl extends AbstractConfiguredObject<GroupMemberImpl> i
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
index ecc166f8fc..7dc032cc90 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
@@ -26,6 +26,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -89,16 +92,18 @@ public class GroupProviderImpl extends AbstractConfiguredObject<GroupProviderImp
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 4329f000ec..4a2d30ac9f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -38,11 +38,15 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -388,27 +392,65 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return isStoreEmptyHandler.isEmpty();
}
- protected void createDefaultExchanges()
+ protected ListenableFuture<Void> createDefaultExchanges()
{
- Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>()
+ return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>()
{
+ private static final int TOTAL_STANDARD_EXCHANGES = 4;
+ private final AtomicInteger _createdExchangeCount = new AtomicInteger();
+ private SettableFuture<Void> _future = SettableFuture.create();
+
@Override
- public Void run()
+ public ListenableFuture<Void> run()
{
addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
- return null;
+ return _future;
+ }
+
+ private void standardExchangeCreated()
+ {
+ if(_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES)
+ {
+ _future.set(null);
+ }
}
- void addStandardExchange(String name, String type)
+ ListenableFuture<Void> addStandardExchange(String name, String type)
{
+
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.NAME, name);
attributes.put(Exchange.TYPE, type);
attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName()));
- childAdded(addExchange(attributes));
+ final ListenableFuture<ExchangeImpl> future = addExchangeAsync(attributes);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ Futures.addCallback(future, new FutureCallback<ExchangeImpl>()
+ {
+ @Override
+ public void onSuccess(final ExchangeImpl result)
+ {
+ try
+ {
+ childAdded(result);
+ }
+ finally
+ {
+ standardExchangeCreated();
+ }
+
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ standardExchangeCreated();
+ }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
}
});
}
@@ -777,6 +819,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
+ private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes)
+ throws ExchangeExistsException, ReservedExchangeNameException,
+ NoFactoryForTypeException
+ {
+ try
+ {
+ ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this);
+ return result;
+ }
+ catch (DuplicateNameException e)
+ {
+ throw new ExchangeExistsException(getExchange(e.getName()));
+ }
+
+ }
+
+
@Override
public void removeExchange(ExchangeImpl exchange, boolean force)
throws ExchangeIsAlternateException, RequiredExchangeException
@@ -809,7 +868,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
protected ListenableFuture<Void> beforeClose()
{
- _logger.debug("KWDEBUG setting state to UNAVAILABLE");
setState(State.UNAVAILABLE);
return super.beforeClose();
@@ -818,7 +876,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
protected void onClose()
{
- _logger.debug("KWDEBUG onClose");
//Stop Connections
_connectionRegistry.close();
_dtxRegistry.close();
@@ -830,7 +887,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private void closeMessageStore()
{
- _logger.debug("KWDEBUG closeMessageStore");
if (getMessageStore() != null)
{
try
@@ -1312,38 +1368,76 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- // TODO - need to deal with async close children
- closeChildren();
- shutdownHouseKeeping();
- closeMessageStore();
- setState(State.STOPPED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeChildren().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ shutdownHouseKeeping();
+ closeMessageStore();
+ setState(State.STOPPED);
+
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED )
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
if(_deleted.compareAndSet(false,true))
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
String hostName = getName();
- close();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MessageStore ms = getMessageStore();
+ if (ms != null)
+ {
+ try
+ {
+ ms.onDelete(AbstractVirtualHost.this);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occurred on message store deletion", e);
+ }
+ }
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
- MessageStore ms = getMessageStore();
- if (ms != null)
- {
- try
- {
- ms.onDelete(this);
- }
- catch (Exception e)
- {
- _logger.warn("Exception occurred on message store deletion", e);
- }
- }
- deleted();
- setState(State.DELETED);
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -1532,7 +1626,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE )
- private void onActivate()
+ private ListenableFuture<Void> onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory());
@@ -1552,9 +1646,28 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if (isStoreEmpty())
{
- createDefaultExchanges();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ createDefaultExchanges().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ postCreateDefaultExchangeTasks();
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
+ else
+ {
+ postCreateDefaultExchangeTasks();
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ private void postCreateDefaultExchangeTasks()
+ {
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
_messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
@@ -1589,9 +1702,32 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker);
}
}
+ private static class ChildCounter
+ {
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
@StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- private void onRestart()
+ private ListenableFuture<Void> onRestart()
{
resetStatistics();
@@ -1622,6 +1758,25 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
new GenericRecoverer(this).recover(records);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onActivate().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ }
+ });
+ counter.incrementCount();
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
{
@Override
@@ -1632,14 +1787,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public void performAction(final ConfiguredObject<?> object)
{
- object.open();
+ counter.incrementCount();
+ object.openAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ counter.decrementCount();
+ }
+ }, getTaskExecutor().getExecutor());
}
});
return null;
}
});
-
- onActivate();
+ counter.decrementCount();
+ return returnVal;
}
private class FileSystemSpaceChecker extends HouseKeepingTask
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
index 03c30a9cd4..fd73963b68 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
@@ -29,6 +29,8 @@ import java.util.Map;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -68,7 +70,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
}
@Override
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
if (LOGGER.isDebugEnabled())
{
@@ -107,15 +109,21 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
if (host != null)
{
final VirtualHost<?,?,?> recoveredHost = host;
- Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- recoveredHost.open();
- return null;
- }
- });
+ final ListenableFuture<Void> openFuture = Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+ new PrivilegedAction<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> run()
+ {
+ return recoveredHost.openAsync();
+
+ }
+ });
+ return openFuture;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index ba915e3427..bccf284b34 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -38,7 +38,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -122,16 +125,47 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
}
@StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
try
{
- activate();
- setState(State.ACTIVE);
+ Futures.addCallback(activate(),
+ new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ try
+ {
+ setState(State.ACTIVE);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+
+ setState(State.ERRORED);
+ returnVal.set(null);
+ if (_broker.isManagementMode())
+ {
+ LOGGER.warn("Failed to make " + this + " active.", t);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
}
catch(RuntimeException e)
{
setState(State.ERRORED);
+ returnVal.set(null);
if (_broker.isManagementMode())
{
LOGGER.warn("Failed to make " + this + " active.", e);
@@ -141,6 +175,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
throw e;
}
}
+ return returnVal;
}
@Override
@@ -183,40 +218,73 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
}
@StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
+ final SettableFuture<Void> returnVal = SettableFuture.create();
setState(State.DELETED);
deleteVirtualHostIfExists();
- final ListenableFuture<Void> closeFuture = close();
- deleted();
- DurableConfigurationStore configurationStore = getConfigurationStore();
- if (configurationStore != null)
+ final ListenableFuture<Void> closeFuture = closeAsync();
+ closeFuture.addListener(new Runnable()
{
- configurationStore.onDelete(this);
- }
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ DurableConfigurationStore configurationStore = getConfigurationStore();
+ if (configurationStore != null)
+ {
+ configurationStore.onDelete(AbstractVirtualHostNode.this);
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
+
}
- protected void deleteVirtualHostIfExists()
+ protected ListenableFuture<Void> deleteVirtualHostIfExists()
{
VirtualHost<?, ?, ?> virtualHost = getVirtualHost();
if (virtualHost != null)
{
- virtualHost.delete();
+ return virtualHost.deleteAsync();
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- stopAndSetStateTo(State.STOPPED);
+ return stopAndSetStateTo(State.STOPPED);
}
- protected void stopAndSetStateTo(State stoppedState)
+ protected ListenableFuture<Void> stopAndSetStateTo(final State stoppedState)
{
- // TODO - deal with async close children
- closeChildren();
- closeConfigurationStoreSafely();
- setState(stoppedState);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ ListenableFuture<Void> childCloseFuture = closeChildren();
+ childCloseFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ closeConfigurationStoreSafely();
+ setState(stoppedState);
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
}
@Override
@@ -311,7 +379,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
protected abstract DurableConfigurationStore createConfigurationStore();
- protected abstract void activate();
+ protected abstract ListenableFuture<Void> activate();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
index c94d113514..8a160f83d7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +65,7 @@ public class RedirectingVirtualHostNodeImpl
}
@StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
try
{
@@ -83,6 +85,7 @@ public class RedirectingVirtualHostNodeImpl
throw e;
}
}
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 889984eb67..b2d35f690f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -151,6 +151,7 @@ public class HeadersBindingTest extends TestCase
_count++;
_queue = mock(AMQQueue.class);
TaskExecutor executor = new CurrentThreadTaskExecutor();
+ executor.start();
VirtualHostImpl vhost = mock(VirtualHostImpl.class);
when(_queue.getVirtualHost()).thenReturn(vhost);
when(_queue.getModel()).thenReturn(BrokerModel.getInstance());
@@ -158,6 +159,7 @@ public class HeadersBindingTest extends TestCase
when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
final EventLogger eventLogger = new EventLogger();
when(vhost.getEventLogger()).thenReturn(eventLogger);
+ when(vhost.getTaskExecutor()).thenReturn(executor);
_exchange = mock(ExchangeImpl.class);
when(_exchange.getType()).thenReturn(ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
when(_exchange.getEventLogger()).thenReturn(eventLogger);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index c21a386eaa..26db573e41 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.model;
import static java.util.Arrays.asList;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@@ -33,12 +34,15 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -413,7 +418,30 @@ public class VirtualHostTest extends QpidTestCase
private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost)
{
final AMQConnectionModel connection = mock(AMQConnectionModel.class);
+ final List<Action<?>> tasks = new ArrayList<>();
+ final ArgumentCaptor<Action> deleteTaskCaptor = ArgumentCaptor.forClass(Action.class);
+ Answer answer = new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ return tasks.add(deleteTaskCaptor.getValue());
+ }
+ };
+ doAnswer(answer).when(connection).addDeleteTask(deleteTaskCaptor.capture());
when(connection.getVirtualHost()).thenReturn(virtualHost);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ for(Action action : tasks)
+ {
+ action.performAction(connection);
+ }
+ return null;
+ }
+ }).when(connection).closeAsync(any(AMQConstant.class),anyString());
when(connection.getRemoteAddressString()).thenReturn("peer:1234");
return connection;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
index 0b35ba9330..5c91052956 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
@@ -26,6 +26,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -127,15 +130,17 @@ public class TestConfiguredObject extends AbstractConfiguredObject
}
@StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED )
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
public boolean isOpened()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index c1a9240f2c..7667267df3 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -31,14 +31,12 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import org.apache.log4j.Logger;
import org.mockito.ArgumentCaptor;
@@ -56,7 +54,6 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
index 889097f850..74fe371b2f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
@@ -22,17 +22,13 @@ package org.apache.qpid.server.virtualhost;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doThrow;
-import java.io.File;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -50,7 +46,6 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.verification.VerificationMode;
public class AbstractVirtualHostTest extends QpidTestCase
{
@@ -90,7 +85,7 @@ public class AbstractVirtualHostTest extends QpidTestCase
{
if (_taskExecutor != null)
{
- _taskExecutor.stopImmediately();
+ _taskExecutor.stop();
}
}
finally
@@ -179,7 +174,7 @@ public class AbstractVirtualHostTest extends QpidTestCase
verify(store, times(0)).closeMessageStore();
}
- public void testDeleteInErrorStateAfterOpen()
+ public void testDeleteInErrorStateAfterOpen() throws Exception
{
Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName());
AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
index b17f383217..4f8fe097ab 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
@@ -35,6 +35,9 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -560,8 +563,9 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase
}
@Override
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
index 00b42094b1..c1bd1b0bb8 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
@@ -26,6 +26,9 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -152,7 +155,7 @@ public class ACLFileAccessControlProviderImpl
@StateTransition(currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
if(_broker.isManagementMode())
@@ -180,6 +183,7 @@ public class ACLFileAccessControlProviderImpl
}
}
}
+ return Futures.immediateFuture(null);
}
@Override
@@ -193,17 +197,36 @@ public class ACLFileAccessControlProviderImpl
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
- private void startQuiesced()
+ private ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- close();
- setState(State.DELETED);
- deleted();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ closeAsync().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ setState(State.DELETED);
+ deleted();
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
}
@Override
diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
index a34ac16e80..2a691b3652 100644
--- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
+++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AccessControlProvider;
import org.apache.qpid.server.model.Broker;
@@ -54,7 +55,9 @@ public class ACLFileAccessControlProviderFactoryTest extends QpidTestCase
when(_broker.getObjectFactory()).thenReturn(_objectFactory);
when(_broker.getModel()).thenReturn(_objectFactory.getModel());
when(_broker.getCategoryClass()).thenReturn(Broker.class);
- when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class));
+ TaskExecutor taskExecutor = new CurrentThreadTaskExecutor();
+ taskExecutor.start();
+ when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
}
public void testCreateInstanceWhenAclFileIsNotPresent()
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 62a95e9869..28d8a6c88c 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -125,9 +125,8 @@ class ManagementNodeConsumer implements ConsumerImpl
}
@Override
- public ListenableFuture<Void> close()
+ public void close()
{
- return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index 69920ff488..1a85a24e0b 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -39,6 +39,8 @@ import javax.servlet.DispatcherType;
import javax.servlet.MultipartConfigElement;
import javax.servlet.http.HttpServletRequest;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
@@ -130,7 +132,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
}
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void doStart()
+ private ListenableFuture<Void> doStart()
{
getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
@@ -148,6 +150,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
@@ -206,7 +209,9 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
if(port.getState() != State.ACTIVE)
{
- port.start();
+
+ // TODO - RG
+ port.startAsync();
}
Connector connector = null;
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
index 52d7ba33a3..4327292336 100644
--- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
@@ -58,7 +59,9 @@ public class HttpManagementTest extends QpidTestCase
when(_broker.getModel()).thenReturn(objectFactory.getModel());
when(_broker.getCategoryClass()).thenReturn(Broker.class);
when(_broker.getEventLogger()).thenReturn(mock(EventLogger.class));
- when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class));
+ TaskExecutor taskExecutor = new TaskExecutorImpl();
+ taskExecutor.start();
+ when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, false);
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
index 6c962c2901..06558b9f9a 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
@@ -32,6 +32,8 @@ import java.util.Set;
import javax.management.InstanceAlreadyExistsException;
import javax.management.JMException;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -105,7 +107,7 @@ public class JMXManagementPluginImpl
}
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void doStart() throws JMException, IOException
+ private ListenableFuture<Void> doStart() throws JMException, IOException
{
_allowPortActivation = true;
Broker<?> broker = getBroker();
@@ -125,7 +127,8 @@ public class JMXManagementPluginImpl
registryPort.setPortManager(this);
if(port.getState() != State.ACTIVE)
{
- port.start();
+ // TODO - RG
+ port.startAsync();
}
}
@@ -135,7 +138,7 @@ public class JMXManagementPluginImpl
connectorPort.setPortManager(this);
if(port.getState() != State.ACTIVE)
{
- port.start();
+ port.startAsync();
}
}
@@ -175,6 +178,7 @@ public class JMXManagementPluginImpl
_objectRegistry.start();
setState(State.ACTIVE);
_allowPortActivation = false;
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 32de06186a..b243769b32 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -386,7 +386,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
assertEquals("Incorrect number of exchanges registered after second recovery",
origExchangeCount, _virtualHost.getExchanges().size());
assertNull("Durable exchange was not removed:" + directExchangeName,
- _virtualHost.getExchange(directExchangeName));
+ _virtualHost.getExchange(directExchangeName));
}
/**
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java
index 4d06c7b624..4ca3b2ba5c 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java
@@ -36,7 +36,6 @@ public class LogRecordsRestTest extends QpidRestTestCase
assertNotNull("Message id cannot be null", record.get("id"));
assertNotNull("Message timestamp cannot be null", record.get("timestamp"));
assertEquals("Unexpected log level", "INFO", record.get("level"));
- assertEquals("Unexpected thread", "main", record.get("thread"));
assertEquals("Unexpected logger", "qpid.message.broker.ready", record.get("logger"));
}