summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java109
1 files changed, 93 insertions, 16 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index e03904789d..8bcbba9ac4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -27,10 +27,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.CloseFuture;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
@@ -51,6 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
private final Action _underlyingConnectionDeleteTask;
private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
private AMQConnectionModel _underlyingConnection;
+ private final AtomicBoolean _closing = new AtomicBoolean();
public ConnectionAdapter(final AMQConnectionModel conn)
{
@@ -156,17 +160,59 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
- closeUnderlyingConnection();
- deleted();
- setState(State.DELETED);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ asyncClose().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
+ }
+
+ @Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ return asyncClose();
+
+ }
+
+ private ListenableFuture<Void> asyncClose()
+ {
+ final SettableFuture<Void> closeFuture = SettableFuture.create();
+
+ _underlyingConnection.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ closeFuture.set(null);
+ }
+ });
+
+ _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ return closeFuture;
}
@Override
protected void onClose()
{
- closeUnderlyingConnection();
}
@Override
@@ -233,23 +279,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
// SessionAdapter installs delete task to cause session model object to delete
}
- private void closeUnderlyingConnection()
+
+ private static class ConnectionCloseFuture implements CloseFuture
{
- if (_underlyingClosed.compareAndSet(false, true))
+ private boolean _closed;
+
+ public synchronized void connectionClosed()
{
- _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
- try
+ _closed = true;
+ notifyAll();
+
+ }
+
+ @Override
+ public void runWhenComplete(final Runnable closeRunnable)
+ {
+ if (_closed )
{
- _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ closeRunnable.run();
}
- catch (Exception e)
+ else
{
- LOGGER.warn("Exception closing connection "
- + _underlyingConnection.getConnectionId()
- + " from "
- + _underlyingConnection.getRemoteAddressString(), e);
- }
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (ConnectionCloseFuture.this)
+ {
+ while (!_closed)
+ {
+ try
+ {
+ ConnectionCloseFuture.this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ closeRunnable.run();
+ }
+ }
+ });
+
+ t.setDaemon(true);
+ t.start();
+ }
}
}