diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-05 14:57:46 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-05 14:57:46 +0000 |
commit | 10b21b20fbd892d19ae64084165ec8942f864eac (patch) | |
tree | aa38261850b1b7d70b73e3bf7e5423d04c6b4afb | |
parent | f3ed8aa6f4a8ede937578e3e06040fb9e121e47a (diff) | |
download | qpid-python-10b21b20fbd892d19ae64084165ec8942f864eac.tar.gz |
rewrite close
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1664366 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 169 insertions, 101 deletions
diff --git a/qpid/java/broker-core/pom.xml b/qpid/java/broker-core/pom.xml index 516ac9a4c4..e8217c89e3 100644 --- a/qpid/java/broker-core/pom.xml +++ b/qpid/java/broker-core/pom.xml @@ -107,8 +107,14 @@ </exclusions> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava-version}</version> + </dependency> + <!-- test dependencies --> - <dependency> + <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-test-utils</artifactId> <version>${project.version}</version> diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index e88763dd1d..8c389e6d22 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -29,10 +29,13 @@ import java.security.PrivilegedExceptionAction; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.security.auth.Subject; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; @@ -54,7 +57,6 @@ import org.apache.qpid.server.plugin.PluggableFactoryLoader; import org.apache.qpid.server.plugin.SystemConfigFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.FutureResult; public class Broker implements BrokerShutdownProvider { @@ -108,13 +110,13 @@ public class Broker implements BrokerShutdownProvider { if(_systemConfig != null) { - final FutureResult closeResult = _systemConfig.close(); - closeResult.waitForCompletion(5000l); + ListenableFuture<Void> closeResult = _systemConfig.close(); + closeResult.get(5000l, TimeUnit.MILLISECONDS); } _taskExecutor.stop(); } - catch (TimeoutException e) + catch (TimeoutException | InterruptedException | ExecutionException e) { LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately"); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java index e0c03fe822..8d572189b3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.configuration.updater; import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; public interface TaskExecutor @@ -43,4 +44,7 @@ public interface TaskExecutor <T> Future<T> submit(Task<T> task) throws CancellationException; + boolean isTaskExecutorThread(); + + Executor getExecutor(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java index 96e4e256b2..fecb4de7f5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -277,7 +278,13 @@ public class TaskExecutorImpl implements TaskExecutor } } - private boolean isTaskExecutorThread() + @Override + public Executor getExecutor() + { + return _executor; + } + + public boolean isTaskExecutorThread() { return Thread.currentThread() == _taskThread; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index 52fcf07e25..83784d4b25 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -22,9 +22,10 @@ package org.apache.qpid.server.consumer; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.util.FutureResult; public interface ConsumerImpl { @@ -66,7 +67,7 @@ public interface ConsumerImpl boolean seesRequeues(); - FutureResult close(); + ListenableFuture<Void> close(); boolean trySendLock(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 86d0b07e16..2269999e1d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -44,10 +44,15 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; @@ -470,18 +475,66 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - protected FutureResult closeChildren() + private static class ChildCounter { - final List<FutureResult> childCloseFutures = new ArrayList<>(); + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } + + protected final ListenableFuture<Void> closeChildren() + { + LOGGER.debug("KWDEBUG closing children"); + + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }); + counter.incrementCount(); + + applyToChildren(new Action<ConfiguredObject<?>>() { @Override public void performAction(final ConfiguredObject<?> child) { - childCloseFutures.add(child.close()); + counter.incrementCount(); + ListenableFuture<Void> close = child.close(); + close.addListener(new Runnable() + { + @Override + public void run() + { + counter.decrementCount(); + } + }, MoreExecutors.sameThreadExecutor()); } }); + counter.decrementCount(); + for(Collection<ConfiguredObject<?>> childList : _children.values()) { childList.clear(); @@ -497,101 +550,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im childNameMap.clear(); } - - FutureResult futureResult; - if(childCloseFutures.isEmpty()) - { - futureResult = FutureResult.IMMEDIATE_FUTURE; - } - else - { - futureResult = new FutureResult() - { - @Override - public boolean isComplete() - { - for(FutureResult childResult : childCloseFutures) - { - if(!childResult.isComplete()) - { - return false; - } - } - return true; - } - - @Override - public void waitForCompletion() - { - for(FutureResult childResult : childCloseFutures) - { - childResult.waitForCompletion(); - } - } - - - @Override - public void waitForCompletion(long timeout) throws TimeoutException - { - long startTime = System.currentTimeMillis(); - long remaining = timeout; - for(FutureResult childResult : childCloseFutures) - { - - childResult.waitForCompletion(remaining); - remaining = startTime + timeout - System.currentTimeMillis(); - if(remaining < 0) - { - throw new TimeoutException("Completion did not occur within specified timeout: " + timeout); - } - } - } - }; - } - return futureResult; + return returnVal; } @Override - public final FutureResult close() + public final ListenableFuture<Void> close() { if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { - final CloseResult closeResult = new CloseResult(); + final SettableFuture<Void> returnVal = SettableFuture.create(); - CloseFuture close = beforeClose(); + final ListenableFuture<Void> beforeClose = beforeClose(); - Runnable closeRunnable = new Runnable() + if(beforeClose != null) { - @Override - public void run() + beforeClose.addListener(new Runnable() { - final FutureResult result = closeChildren(); - closeResult.setChildFutureResult(result); - onClose(); - unregister(false); - - } - }; - - if (close == null) - { - closeRunnable.run(); + @Override + public void run() + { + final ListenableFuture<Void> childCloseFuture = closeChildren(); + childCloseFuture.addListener(new Runnable() + { + @Override + public void run() + { + onClose(); + unregister(false); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + } + }, getTaskExecutor().getExecutor()); } else { - close.runWhenComplete(closeRunnable); + final ListenableFuture<Void> childCloseFuture = closeChildren(); + childCloseFuture.addListener(new Runnable() + { + @Override + public void run() + { + onClose(); + unregister(false); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); } - // if future not complete, schedule the remainder to be done once complete. - return closeResult; + return returnVal; + + } else { - return FutureResult.IMMEDIATE_FUTURE; + return Futures.immediateFuture(null); } } - protected CloseFuture beforeClose() + protected ListenableFuture<Void> beforeClose() { return null; } @@ -2013,6 +2030,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } _childFutureResult.waitForCompletion(); + } @Override @@ -2042,6 +2060,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } _childFutureResult.waitForCompletion(remaining); + } public synchronized void setChildFutureResult(final FutureResult childFutureResult) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index bfe9c8b15d..395cb52fcd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -26,9 +26,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.util.FutureResult; @ManagedObject( creatable = false, category = false ) /** @@ -250,7 +251,7 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> void open(); - FutureResult close(); + ListenableFuture<Void> close(); TaskExecutor getTaskExecutor(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 0cbb80d722..a4dbd7d5e5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.protocol.AMQConstant; @@ -166,25 +168,24 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } @Override - protected CloseFuture beforeClose() + protected ListenableFuture<Void> beforeClose() { _closing.set(true); - final ConnectionCloseFuture closeFuture = asyncClose(); + return asyncClose(); - return closeFuture; } - private ConnectionCloseFuture asyncClose() + private ListenableFuture<Void> asyncClose() { - final ConnectionCloseFuture closeFuture = new ConnectionCloseFuture(); + final SettableFuture<Void> closeFuture = SettableFuture.create(); _underlyingConnection.addDeleteTask(new Action() { @Override public void performAction(final Object object) { - closeFuture.connectionClosed(); + closeFuture.set(null); } }); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 639d569e8f..0ba48387dd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; @@ -52,7 +53,6 @@ import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; @@ -97,6 +97,7 @@ import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.transport.TransportException; public abstract class AbstractQueue<X extends AbstractQueue<X>> @@ -823,7 +824,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override - protected org.apache.qpid.server.model.CloseFuture beforeClose() + protected ListenableFuture<Void> beforeClose() { _closing = true; return super.beforeClose(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 06917f0161..4329f000ec 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -62,6 +63,7 @@ import org.apache.qpid.server.message.MessageNode; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.adapter.ConnectionAdapter; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ConnectionValidator; @@ -805,15 +807,18 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - protected CloseFuture beforeClose() + protected ListenableFuture<Void> beforeClose() { + _logger.debug("KWDEBUG setting state to UNAVAILABLE"); setState(State.UNAVAILABLE); - return null; + + return super.beforeClose(); } @Override protected void onClose() { + _logger.debug("KWDEBUG onClose"); //Stop Connections _connectionRegistry.close(); _dtxRegistry.close(); @@ -825,6 +830,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private void closeMessageStore() { + _logger.debug("KWDEBUG closeMessageStore"); if (getMessageStore() != null) { try @@ -1308,6 +1314,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) protected void doStop() { + // TODO - need to deal with async close children closeChildren(); shutdownHouseKeeping(); closeMessageStore(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index bcfd0ff951..ba915e3427 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -186,7 +187,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< { setState(State.DELETED); deleteVirtualHostIfExists(); - close(); + final ListenableFuture<Void> closeFuture = close(); deleted(); DurableConfigurationStore configurationStore = getConfigurationStore(); if (configurationStore != null) @@ -212,6 +213,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< protected void stopAndSetStateTo(State stoppedState) { + // TODO - deal with async close children closeChildren(); closeConfigurationStoreSafely(); setState(stoppedState); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java index 4343419505..26645722c9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java @@ -22,11 +22,14 @@ package org.apache.qpid.server.configuration.updater; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.util.concurrent.MoreExecutors; + public class CurrentThreadTaskExecutor implements TaskExecutor { private final AtomicReference<Thread> _thread = new AtomicReference<>(); @@ -144,4 +147,15 @@ public class CurrentThreadTaskExecutor implements TaskExecutor return executor; } + @Override + public boolean isTaskExecutorThread() + { + return true; + } + + @Override + public Executor getExecutor() + { + return MoreExecutors.sameThreadExecutor(); + } } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 701c704fb6..62a95e9869 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -24,12 +24,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.StateChangeListener; class ManagementNodeConsumer implements ConsumerImpl @@ -123,9 +125,9 @@ class ManagementNodeConsumer implements ConsumerImpl } @Override - public FutureResult close() + public ListenableFuture<Void> close() { - return FutureResult.IMMEDIATE_FUTURE; + return Futures.immediateFuture(null); } @Override diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 70056d6968..3397c7ff47 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -47,6 +47,7 @@ org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemo org.apache.qpid.server.store.berkeleydb.* org.apache.qpid.server.store.berkeleydb.replication.* org.apache.qpid.server.store.berkeleydb.upgrade.* +org.apache.qpid.server.virtualhostnode.berkeleydb.* org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart |