diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index 43cb5f0c62..350f137a04 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.BrokerProperties; @@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< private final Broker<?> _broker; private AcceptingTransport _transport; + private final AtomicBoolean _closing = new AtomicBoolean(); + private final SettableFuture _noConnectionsRemain = SettableFuture.create(); @ManagedObjectFactoryConstructor public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker) @@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< } @Override + protected ListenableFuture<Void> beforeClose() + { + _closing.set(true); + + if (_connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + + return _noConnectionsRemain; + } + + @Override protected void onClose() { if (_transport != null) @@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< { _broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort())); } + + _transport.close(); } } @@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< _connectionCountWarningGiven.compareAndSet(true,false); } + if (_closing.get() && _connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + return openConnections; } @@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< @Override public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress) { - return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections; + return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections ); } @Override |