summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java60
1 files changed, 27 insertions, 33 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index adfd178ec3..b1a89c35fa 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -31,6 +31,7 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -39,6 +40,7 @@ import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
@@ -83,23 +85,23 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn.checkNotClosed();
int channelId = _conn.getNextChannelID();
- AMQSession session;
try
{
- session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
+ AMQSession session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
_conn.registerSession(channelId, session);
if (_conn._started)
{
session.start();
}
+
+ return session;
}
catch (Exception e)
{
_logger.error("exception creating session:", e);
throw new JMSAMQException("cannot create session", e);
}
- return session;
}
/**
@@ -153,15 +155,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
if (_logger.isDebugEnabled())
{
_logger.debug("connecting to host: " + brokerDetail.getHost()
- + " port: " + brokerDetail.getPort() + " vhost: "
- + _conn.getVirtualHost() + " username: "
- + _conn.getUsername() + " password: "
- + _conn.getPassword());
+ + " protocol: " + brokerDetail.getTransport()
+ + " port: " + brokerDetail.getPort()
+ + " vhost: " + _conn.getVirtualHost()
+ + " username: " + _conn.getUsername()
+ + " password: " + _conn.getPassword());
}
- ConnectionSettings conSettings = new ConnectionSettings();
- retriveConnectionSettings(conSettings,brokerDetail);
- _qpidConnection.connect(conSettings);
+ ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail);
+ SSLConfiguration sslConfig = _conn.getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
+ {
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ }
+ _qpidConnection.connect(conSettings, sslFactory);
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
@@ -173,7 +181,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
catch (ConnectionException ce)
{
- AMQConstant code = AMQConstant.REPLY_SUCCESS;
+ AMQConstant code = AMQConstant.CHANNEL_ERROR;
if (ce.getClose() != null && ce.getClose().getReplyCode() != null)
{
code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
@@ -198,6 +206,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
_logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ _qpidConnection.resume();
for (AMQSession s : sessions)
{
((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
@@ -259,24 +268,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
- ExceptionListener listener = _conn._exceptionListener;
- if (listener == null)
- {
- _logger.error("connection exception: " + conn, exc);
- }
- else
- {
- String code = null;
- if (close != null)
- {
- code = close.getReplyCode().toString();
- }
-
- JMSException ex = new JMSException(exc.getMessage(), code);
- ex.setLinkedException(exc);
- ex.initCause(exc);
- listener.onException(ex);
- }
+ _conn.exceptionReceived(new AMQDisconnectedException("Server closed connection and reconnection not permitted."));
}
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
@@ -301,14 +293,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return ProtocolVersion.v0_10;
}
- private void retriveConnectionSettings(ConnectionSettings conSettings, BrokerDetails brokerDetail)
+ private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
{
-
+ ConnectionSettings conSettings = new ConnectionSettings();
conSettings.setHost(brokerDetail.getHost());
conSettings.setPort(brokerDetail.getPort());
conSettings.setVhost(_conn.getVirtualHost());
conSettings.setUsername(_conn.getUsername());
conSettings.setPassword(_conn.getPassword());
+ conSettings.setProtocol(brokerDetail.getTransport());
// ------------ sasl options ---------------
if (brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null)
@@ -386,11 +379,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
if (brokerDetail.getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY) != null)
{
- conSettings.setTcpNodelay(
- brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY));
+ conSettings.setTcpNodelay(brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY));
}
conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
+
+ return conSettings;
}
// The idle_timeout prop is in milisecs while