summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-11 15:55:25 +0000
committerKeith Wall <kwall@apache.org>2015-03-11 15:55:25 +0000
commit1f09c9477270dbbf5fdaed614015db73aabb995b (patch)
treeca5fa6a594a5c09db304d24c5ef3dd081b786812
parent641d37f3cfcb05146cbd99dda0b29ca593601762 (diff)
downloadqpid-python-1f09c9477270dbbf5fdaed614015db73aabb995b.tar.gz
Bug fix: Delay shutting download the Port's executor until the port has no remaining connections
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665911 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java26
2 files changed, 27 insertions, 3 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 736a925943..28329f099b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -644,7 +644,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public ListenableFuture<Void> call() throws Exception
{
- LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + getName());
+ LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
@@ -669,7 +669,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
else
{
- LOGGER.debug("Closed " + getClass().getSimpleName() + " : " + getName());
+ LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
return Futures.immediateFuture(null);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index 43cb5f0c62..350f137a04 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
private final Broker<?> _broker;
private AcceptingTransport _transport;
+ private final AtomicBoolean _closing = new AtomicBoolean();
+ private final SettableFuture _noConnectionsRemain = SettableFuture.create();
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
}
@Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ if (_connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
+ return _noConnectionsRemain;
+ }
+
+ @Override
protected void onClose()
{
if (_transport != null)
@@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
{
_broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort()));
}
+
+
_transport.close();
}
}
@@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
_connectionCountWarningGiven.compareAndSet(true,false);
}
+ if (_closing.get() && _connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
return openConnections;
}
@@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
@Override
public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
{
- return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections;
+ return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections );
}
@Override