diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java | 109 |
1 files changed, 93 insertions, 16 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index e03904789d..8bcbba9ac4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -27,10 +27,13 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.CloseFuture; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; @@ -51,6 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection private final Action _underlyingConnectionDeleteTask; private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false); private AMQConnectionModel _underlyingConnection; + private final AtomicBoolean _closing = new AtomicBoolean(); public ConnectionAdapter(final AMQConnectionModel conn) { @@ -156,17 +160,59 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { - closeUnderlyingConnection(); - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + asyncClose().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; + } + + @Override + protected ListenableFuture<Void> beforeClose() + { + _closing.set(true); + + return asyncClose(); + + } + + private ListenableFuture<Void> asyncClose() + { + final SettableFuture<Void> closeFuture = SettableFuture.create(); + + _underlyingConnection.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + closeFuture.set(null); + } + }); + + _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + return closeFuture; } @Override protected void onClose() { - closeUnderlyingConnection(); } @Override @@ -233,23 +279,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection // SessionAdapter installs delete task to cause session model object to delete } - private void closeUnderlyingConnection() + + private static class ConnectionCloseFuture implements CloseFuture { - if (_underlyingClosed.compareAndSet(false, true)) + private boolean _closed; + + public synchronized void connectionClosed() { - _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask); - try + _closed = true; + notifyAll(); + + } + + @Override + public void runWhenComplete(final Runnable closeRunnable) + { + if (_closed ) { - _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + closeRunnable.run(); } - catch (Exception e) + else { - LOGGER.warn("Exception closing connection " - + _underlyingConnection.getConnectionId() - + " from " - + _underlyingConnection.getRemoteAddressString(), e); - } + Thread t = new Thread(new Runnable() + { + @Override + public void run() + { + synchronized (ConnectionCloseFuture.this) + { + while (!_closed) + { + try + { + ConnectionCloseFuture.this.wait(); + } + catch (InterruptedException e) + { + } + } + + closeRunnable.run(); + } + } + }); + + t.setDaemon(true); + t.start(); + } } } |