summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java49
2 files changed, 51 insertions, 2 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index e0c03fe822..8d572189b3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
public interface TaskExecutor
@@ -43,4 +44,7 @@ public interface TaskExecutor
<T> Future<T> submit(Task<T> task) throws CancellationException;
+ boolean isTaskExecutorThread();
+
+ Executor getExecutor();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index 96e4e256b2..0f59494850 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -48,6 +49,7 @@ public class TaskExecutorImpl implements TaskExecutor
private volatile Thread _taskThread;
private final AtomicBoolean _running = new AtomicBoolean();
private volatile ExecutorService _executor;
+ private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor();
@Override
@@ -67,7 +69,7 @@ public class TaskExecutorImpl implements TaskExecutor
@Override
public Thread newThread(Runnable r)
{
- _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME);
+ _taskThread = new TaskThread(r, TASK_EXECUTION_THREAD_NAME, TaskExecutorImpl.this);
return _taskThread;
}
});
@@ -277,7 +279,13 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- private boolean isTaskExecutorThread()
+ @Override
+ public Executor getExecutor()
+ {
+ return _wrappedExecutor;
+ }
+
+ public boolean isTaskExecutorThread()
{
return Thread.currentThread() == _taskThread;
}
@@ -373,4 +381,41 @@ public class TaskExecutorImpl implements TaskExecutor
return get();
}
}
+
+ private class ImmediateIfSameThreadExecutor implements Executor
+ {
+
+ @Override
+ public void execute(final Runnable command)
+ {
+ if(isTaskExecutorThread()
+ || (_executor == null && (Thread.currentThread() instanceof TaskThread
+ && ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this)))
+ {
+ command.run();
+ }
+ else
+ {
+ _executor.execute(command);
+ }
+
+ }
+ }
+
+ private static class TaskThread extends Thread
+ {
+
+ private final TaskExecutorImpl _taskExecutor;
+
+ public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor)
+ {
+ super(r, name);
+ _taskExecutor = taskExecutor;
+ }
+
+ public TaskExecutorImpl getTaskExecutor()
+ {
+ return _taskExecutor;
+ }
+ }
}