diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 08ee7c3705..e1bf007e83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -24,13 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.BasicQosBody; @@ -68,7 +66,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; - public void closeConnection(long timeout) throws JMSException, AMQException { _conn.getProtocolHandler().closeConnection(timeout); @@ -110,9 +107,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate sslContext = SSLContextFactory.buildClientContext( settings.getTrustStorePath(), settings.getTrustStorePassword(), + settings.getTrustStoreType(), settings.getTrustManagerFactoryAlgorithm(), settings.getKeyStorePath(), settings.getKeyStorePassword(), + settings.getKeyStoreType(), settings.getKeyManagerFactoryAlgorithm(), settings.getCertAlias()); } @@ -137,6 +136,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn.getFailoverPolicy().attainedConnection(); _conn.setConnected(true); + _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress()); return null; } else @@ -283,7 +283,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); + + // reset the flow control flag + // on opening channel, broker sends flow blocked if virtual host is blocked + // if virtual host is not blocked, then broker does not send flow command + // that's why we need to reset the flow control flag + s.setFlowControl(true); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted()); + s.resubscribe(); } } |