diff options
author | Keith Wall <kwall@apache.org> | 2015-03-11 15:55:25 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-03-11 15:55:25 +0000 |
commit | 1f09c9477270dbbf5fdaed614015db73aabb995b (patch) | |
tree | ca5fa6a594a5c09db304d24c5ef3dd081b786812 | |
parent | 641d37f3cfcb05146cbd99dda0b29ca593601762 (diff) | |
download | qpid-python-1f09c9477270dbbf5fdaed614015db73aabb995b.tar.gz |
Bug fix: Delay shutting download the Port's executor until the port has no remaining connections
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665911 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 27 insertions, 3 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 736a925943..28329f099b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -644,7 +644,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public ListenableFuture<Void> call() throws Exception { - LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName()); + LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { @@ -669,7 +669,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } else { - LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName()); + LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName()); return Futures.immediateFuture(null); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index 43cb5f0c62..350f137a04 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.BrokerProperties; @@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< private final Broker<?> _broker; private AcceptingTransport _transport; + private final AtomicBoolean _closing = new AtomicBoolean(); + private final SettableFuture _noConnectionsRemain = SettableFuture.create(); @ManagedObjectFactoryConstructor public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker) @@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< } @Override + protected ListenableFuture<Void> beforeClose() + { + _closing.set(true); + + if (_connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + + return _noConnectionsRemain; + } + + @Override protected void onClose() { if (_transport != null) @@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< { _broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort())); } + + _transport.close(); } } @@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< _connectionCountWarningGiven.compareAndSet(true,false); } + if (_closing.get() && _connectionCount.get() == 0) + { + _noConnectionsRemain.set(null); + } + return openConnections; } @@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< @Override public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress) { - return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections; + return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections ); } @Override |