diff options
Diffstat (limited to 'qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java')
-rw-r--r-- | qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java | 46 |
1 files changed, 43 insertions, 3 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 55bc8e4f96..7ce445a9b2 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -28,7 +28,9 @@ import org.apache.qpid.amqp_1_0.transport.Container; import javax.jms.*; import javax.jms.IllegalStateException; import javax.jms.Queue; +import javax.net.ssl.SSLContext; +import java.security.NoSuchAlgorithmException; import java.util.*; import org.apache.qpid.amqp_1_0.type.Symbol; @@ -39,6 +41,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { private final String _protocol; + private final SSLContext _sslContext; private ConnectionMetaData _connectionMetaData; private volatile ExceptionListener _exceptionListener; @@ -55,13 +58,18 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private final String _username; private final String _password; private String _remoteHost; - private final boolean _ssl; private String _clientId; private String _queuePrefix; private String _topicPrefix; private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); private int _maxSessions; + private int _maxPrefetch; + + public void setMaxPrefetch(final int maxPrefetch) + { + _maxPrefetch = maxPrefetch; + } private static enum State { @@ -96,6 +104,34 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException { + this(protocol, + host, + port, + username, + password, + clientId, + remoteHost, + ssl ? getDefaultSSLContext() : null, + maxSessions); + } + + private static SSLContext getDefaultSSLContext() throws JMSException + { + try + { + return SSLContext.getDefault(); + } + catch (NoSuchAlgorithmException e) + { + JMSException jmsException = new JMSException(e.getMessage()); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } + } + + public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, SSLContext sslContext, int maxSessions) throws JMSException + { _protocol = protocol; _host = host; _port = port; @@ -103,7 +139,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect _password = password; _clientId = clientId; _remoteHost = remoteHost; - _ssl = ssl; + _sslContext = sslContext; _maxSessions = maxSessions; } @@ -121,7 +157,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect try { _conn = new org.apache.qpid.amqp_1_0.client.Connection(_protocol, _host, - _port, _username, _password, container, _remoteHost, _ssl, + _port, _username, _password, container, _remoteHost, _sslContext, _maxSessions - 1); _conn.setConnectionErrorTask(new ConnectionErrorTask()); // TODO - retrieve negotiated AMQP version @@ -190,6 +226,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect SessionImpl session = new SessionImpl(this, acknowledgeMode); session.setQueueSession(_isQueueConnection); session.setTopicSession(_isTopicConnection); + if(_maxPrefetch != 0) + { + session.setMaxPrefetch(_maxPrefetch); + } boolean connectionStarted = false; synchronized(_lock) |