summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java26
1 files changed, 25 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index 43cb5f0c62..350f137a04 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -118,6 +120,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
private final Broker<?> _broker;
private AcceptingTransport _transport;
+ private final AtomicBoolean _closing = new AtomicBoolean();
+ private final SettableFuture _noConnectionsRemain = SettableFuture.create();
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -254,6 +258,19 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
}
@Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ if (_connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
+ return _noConnectionsRemain;
+ }
+
+ @Override
protected void onClose()
{
if (_transport != null)
@@ -262,6 +279,8 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
{
_broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort()));
}
+
+
_transport.close();
}
}
@@ -500,6 +519,11 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
_connectionCountWarningGiven.compareAndSet(true,false);
}
+ if (_closing.get() && _connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
return openConnections;
}
@@ -511,7 +535,7 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<
@Override
public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
{
- return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections;
+ return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections );
}
@Override