diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java | 60 |
1 files changed, 27 insertions, 33 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index adfd178ec3..b1a89c35fa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -31,6 +31,7 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -39,6 +40,7 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; import org.apache.qpid.transport.ConnectionException; @@ -83,23 +85,23 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn.checkNotClosed(); int channelId = _conn.getNextChannelID(); - AMQSession session; try { - session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, + AMQSession session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); _conn.registerSession(channelId, session); if (_conn._started) { session.start(); } + + return session; } catch (Exception e) { _logger.error("exception creating session:", e); throw new JMSAMQException("cannot create session", e); } - return session; } /** @@ -153,15 +155,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec if (_logger.isDebugEnabled()) { _logger.debug("connecting to host: " + brokerDetail.getHost() - + " port: " + brokerDetail.getPort() + " vhost: " - + _conn.getVirtualHost() + " username: " - + _conn.getUsername() + " password: " - + _conn.getPassword()); + + " protocol: " + brokerDetail.getTransport() + + " port: " + brokerDetail.getPort() + + " vhost: " + _conn.getVirtualHost() + + " username: " + _conn.getUsername() + + " password: " + _conn.getPassword()); } - ConnectionSettings conSettings = new ConnectionSettings(); - retriveConnectionSettings(conSettings,brokerDetail); - _qpidConnection.connect(conSettings); + ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail); + SSLConfiguration sslConfig = _conn.getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) + { + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + } + _qpidConnection.connect(conSettings, sslFactory); _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); @@ -173,7 +181,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } catch (ConnectionException ce) { - AMQConstant code = AMQConstant.REPLY_SUCCESS; + AMQConstant code = AMQConstant.CHANNEL_ERROR; if (ce.getClose() != null && ce.getClose().getReplyCode() != null) { code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); @@ -198,6 +206,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + _qpidConnection.resume(); for (AMQSession s : sessions) { ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; @@ -259,24 +268,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } - ExceptionListener listener = _conn._exceptionListener; - if (listener == null) - { - _logger.error("connection exception: " + conn, exc); - } - else - { - String code = null; - if (close != null) - { - code = close.getReplyCode().toString(); - } - - JMSException ex = new JMSException(exc.getMessage(), code); - ex.setLinkedException(exc); - ex.initCause(exc); - listener.onException(ex); - } + _conn.exceptionReceived(new AMQDisconnectedException("Server closed connection and reconnection not permitted.")); } public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E @@ -301,14 +293,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return ProtocolVersion.v0_10; } - private void retriveConnectionSettings(ConnectionSettings conSettings, BrokerDetails brokerDetail) + private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail) { - + ConnectionSettings conSettings = new ConnectionSettings(); conSettings.setHost(brokerDetail.getHost()); conSettings.setPort(brokerDetail.getPort()); conSettings.setVhost(_conn.getVirtualHost()); conSettings.setUsername(_conn.getUsername()); conSettings.setPassword(_conn.getPassword()); + conSettings.setProtocol(brokerDetail.getTransport()); // ------------ sasl options --------------- if (brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null) @@ -386,11 +379,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec if (brokerDetail.getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY) != null) { - conSettings.setTcpNodelay( - brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)); + conSettings.setTcpNodelay(brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)); } conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); + + return conSettings; } // The idle_timeout prop is in milisecs while |