summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-05 14:57:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-05 14:57:46 +0000
commit10b21b20fbd892d19ae64084165ec8942f864eac (patch)
treeaa38261850b1b7d70b73e3bf7e5423d04c6b4afb
parentf3ed8aa6f4a8ede937578e3e06040fb9e121e47a (diff)
downloadqpid-python-10b21b20fbd892d19ae64084165ec8942f864eac.tar.gz
rewrite close
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1664366 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/pom.xml8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java173
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java14
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java8
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes1
14 files changed, 169 insertions, 101 deletions
diff --git a/qpid/java/broker-core/pom.xml b/qpid/java/broker-core/pom.xml
index 516ac9a4c4..e8217c89e3 100644
--- a/qpid/java/broker-core/pom.xml
+++ b/qpid/java/broker-core/pom.xml
@@ -107,8 +107,14 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava-version}</version>
+ </dependency>
+
<!-- test dependencies -->
- <dependency>
+ <dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-test-utils</artifactId>
<version>${project.version}</version>
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index e88763dd1d..8c389e6d22 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -29,10 +29,13 @@ import java.security.PrivilegedExceptionAction;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
@@ -54,7 +57,6 @@ import org.apache.qpid.server.plugin.PluggableFactoryLoader;
import org.apache.qpid.server.plugin.SystemConfigFactory;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.FutureResult;
public class Broker implements BrokerShutdownProvider
{
@@ -108,13 +110,13 @@ public class Broker implements BrokerShutdownProvider
{
if(_systemConfig != null)
{
- final FutureResult closeResult = _systemConfig.close();
- closeResult.waitForCompletion(5000l);
+ ListenableFuture<Void> closeResult = _systemConfig.close();
+ closeResult.get(5000l, TimeUnit.MILLISECONDS);
}
_taskExecutor.stop();
}
- catch (TimeoutException e)
+ catch (TimeoutException | InterruptedException | ExecutionException e)
{
LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately");
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index e0c03fe822..8d572189b3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
public interface TaskExecutor
@@ -43,4 +44,7 @@ public interface TaskExecutor
<T> Future<T> submit(Task<T> task) throws CancellationException;
+ boolean isTaskExecutorThread();
+
+ Executor getExecutor();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index 96e4e256b2..fecb4de7f5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -277,7 +278,13 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- private boolean isTaskExecutorThread()
+ @Override
+ public Executor getExecutor()
+ {
+ return _executor;
+ }
+
+ public boolean isTaskExecutorThread()
{
return Thread.currentThread() == _taskThread;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index 52fcf07e25..83784d4b25 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -22,9 +22,10 @@ package org.apache.qpid.server.consumer;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.FutureResult;
public interface ConsumerImpl
{
@@ -66,7 +67,7 @@ public interface ConsumerImpl
boolean seesRequeues();
- FutureResult close();
+ ListenableFuture<Void> close();
boolean trySendLock();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 86d0b07e16..2269999e1d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -44,10 +44,15 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
@@ -470,18 +475,66 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- protected FutureResult closeChildren()
+ private static class ChildCounter
{
- final List<FutureResult> childCloseFutures = new ArrayList<>();
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
+
+ protected final ListenableFuture<Void> closeChildren()
+ {
+ LOGGER.debug("KWDEBUG closing children");
+
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ });
+ counter.incrementCount();
+
+
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> child)
{
- childCloseFutures.add(child.close());
+ counter.incrementCount();
+ ListenableFuture<Void> close = child.close();
+ close.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ counter.decrementCount();
+ }
+ }, MoreExecutors.sameThreadExecutor());
}
});
+ counter.decrementCount();
+
for(Collection<ConfiguredObject<?>> childList : _children.values())
{
childList.clear();
@@ -497,101 +550,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
childNameMap.clear();
}
-
- FutureResult futureResult;
- if(childCloseFutures.isEmpty())
- {
- futureResult = FutureResult.IMMEDIATE_FUTURE;
- }
- else
- {
- futureResult = new FutureResult()
- {
- @Override
- public boolean isComplete()
- {
- for(FutureResult childResult : childCloseFutures)
- {
- if(!childResult.isComplete())
- {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void waitForCompletion()
- {
- for(FutureResult childResult : childCloseFutures)
- {
- childResult.waitForCompletion();
- }
- }
-
-
- @Override
- public void waitForCompletion(long timeout) throws TimeoutException
- {
- long startTime = System.currentTimeMillis();
- long remaining = timeout;
- for(FutureResult childResult : childCloseFutures)
- {
-
- childResult.waitForCompletion(remaining);
- remaining = startTime + timeout - System.currentTimeMillis();
- if(remaining < 0)
- {
- throw new TimeoutException("Completion did not occur within specified timeout: " + timeout);
- }
- }
- }
- };
- }
- return futureResult;
+ return returnVal;
}
@Override
- public final FutureResult close()
+ public final ListenableFuture<Void> close()
{
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
- final CloseResult closeResult = new CloseResult();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
- CloseFuture close = beforeClose();
+ final ListenableFuture<Void> beforeClose = beforeClose();
- Runnable closeRunnable = new Runnable()
+ if(beforeClose != null)
{
- @Override
- public void run()
+ beforeClose.addListener(new Runnable()
{
- final FutureResult result = closeChildren();
- closeResult.setChildFutureResult(result);
- onClose();
- unregister(false);
-
- }
- };
-
- if (close == null)
- {
- closeRunnable.run();
+ @Override
+ public void run()
+ {
+ final ListenableFuture<Void> childCloseFuture = closeChildren();
+ childCloseFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ }, getTaskExecutor().getExecutor());
}
else
{
- close.runWhenComplete(closeRunnable);
+ final ListenableFuture<Void> childCloseFuture = closeChildren();
+ childCloseFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
}
- // if future not complete, schedule the remainder to be done once complete.
- return closeResult;
+ return returnVal;
+
+
}
else
{
- return FutureResult.IMMEDIATE_FUTURE;
+ return Futures.immediateFuture(null);
}
}
- protected CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
return null;
}
@@ -2013,6 +2030,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
_childFutureResult.waitForCompletion();
+
}
@Override
@@ -2042,6 +2060,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
_childFutureResult.waitForCompletion(remaining);
+
}
public synchronized void setChildFutureResult(final FutureResult childFutureResult)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index bfe9c8b15d..395cb52fcd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -26,9 +26,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.util.FutureResult;
@ManagedObject( creatable = false, category = false )
/**
@@ -250,7 +251,7 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
void open();
- FutureResult close();
+ ListenableFuture<Void> close();
TaskExecutor getTaskExecutor();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 0cbb80d722..a4dbd7d5e5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.AMQConstant;
@@ -166,25 +168,24 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
@Override
- protected CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
_closing.set(true);
- final ConnectionCloseFuture closeFuture = asyncClose();
+ return asyncClose();
- return closeFuture;
}
- private ConnectionCloseFuture asyncClose()
+ private ListenableFuture<Void> asyncClose()
{
- final ConnectionCloseFuture closeFuture = new ConnectionCloseFuture();
+ final SettableFuture<Void> closeFuture = SettableFuture.create();
_underlyingConnection.addDeleteTask(new Action()
{
@Override
public void performAction(final Object object)
{
- closeFuture.connectionClosed();
+ closeFuture.set(null);
}
});
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 639d569e8f..0ba48387dd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -52,7 +53,6 @@ import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
@@ -97,6 +97,7 @@ import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -823,7 +824,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- protected org.apache.qpid.server.model.CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
_closing = true;
return super.beforeClose();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 06917f0161..4329f000ec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -62,6 +63,7 @@ import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
@@ -805,15 +807,18 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- protected CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
+ _logger.debug("KWDEBUG setting state to UNAVAILABLE");
setState(State.UNAVAILABLE);
- return null;
+
+ return super.beforeClose();
}
@Override
protected void onClose()
{
+ _logger.debug("KWDEBUG onClose");
//Stop Connections
_connectionRegistry.close();
_dtxRegistry.close();
@@ -825,6 +830,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private void closeMessageStore()
{
+ _logger.debug("KWDEBUG closeMessageStore");
if (getMessageStore() != null)
{
try
@@ -1308,6 +1314,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
protected void doStop()
{
+ // TODO - need to deal with async close children
closeChildren();
shutdownHouseKeeping();
closeMessageStore();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index bcfd0ff951..ba915e3427 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -186,7 +187,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
{
setState(State.DELETED);
deleteVirtualHostIfExists();
- close();
+ final ListenableFuture<Void> closeFuture = close();
deleted();
DurableConfigurationStore configurationStore = getConfigurationStore();
if (configurationStore != null)
@@ -212,6 +213,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
protected void stopAndSetStateTo(State stoppedState)
{
+ // TODO - deal with async close children
closeChildren();
closeConfigurationStoreSafely();
setState(stoppedState);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
index 4343419505..26645722c9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
@@ -22,11 +22,14 @@ package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.MoreExecutors;
+
public class CurrentThreadTaskExecutor implements TaskExecutor
{
private final AtomicReference<Thread> _thread = new AtomicReference<>();
@@ -144,4 +147,15 @@ public class CurrentThreadTaskExecutor implements TaskExecutor
return executor;
}
+ @Override
+ public boolean isTaskExecutorThread()
+ {
+ return true;
+ }
+
+ @Override
+ public Executor getExecutor()
+ {
+ return MoreExecutors.sameThreadExecutor();
+ }
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 701c704fb6..62a95e9869 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -24,12 +24,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.util.StateChangeListener;
class ManagementNodeConsumer implements ConsumerImpl
@@ -123,9 +125,9 @@ class ManagementNodeConsumer implements ConsumerImpl
}
@Override
- public FutureResult close()
+ public ListenableFuture<Void> close()
{
- return FutureResult.IMMEDIATE_FUTURE;
+ return Futures.immediateFuture(null);
}
@Override
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index 70056d6968..3397c7ff47 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -47,6 +47,7 @@ org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemo
org.apache.qpid.server.store.berkeleydb.*
org.apache.qpid.server.store.berkeleydb.replication.*
org.apache.qpid.server.store.berkeleydb.upgrade.*
+org.apache.qpid.server.virtualhostnode.berkeleydb.*
org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart
org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart