summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java46
1 files changed, 43 insertions, 3 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 55bc8e4f96..7ce445a9b2 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -28,7 +28,9 @@ import org.apache.qpid.amqp_1_0.transport.Container;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Queue;
+import javax.net.ssl.SSLContext;
+import java.security.NoSuchAlgorithmException;
import java.util.*;
import org.apache.qpid.amqp_1_0.type.Symbol;
@@ -39,6 +41,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
private final String _protocol;
+ private final SSLContext _sslContext;
private ConnectionMetaData _connectionMetaData;
private volatile ExceptionListener _exceptionListener;
@@ -55,13 +58,18 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
private final String _username;
private final String _password;
private String _remoteHost;
- private final boolean _ssl;
private String _clientId;
private String _queuePrefix;
private String _topicPrefix;
private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
private int _maxSessions;
+ private int _maxPrefetch;
+
+ public void setMaxPrefetch(final int maxPrefetch)
+ {
+ _maxPrefetch = maxPrefetch;
+ }
private static enum State
{
@@ -96,6 +104,34 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
{
+ this(protocol,
+ host,
+ port,
+ username,
+ password,
+ clientId,
+ remoteHost,
+ ssl ? getDefaultSSLContext() : null,
+ maxSessions);
+ }
+
+ private static SSLContext getDefaultSSLContext() throws JMSException
+ {
+ try
+ {
+ return SSLContext.getDefault();
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ jmsException.initCause(e);
+ throw jmsException;
+ }
+ }
+
+ public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, SSLContext sslContext, int maxSessions) throws JMSException
+ {
_protocol = protocol;
_host = host;
_port = port;
@@ -103,7 +139,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
_password = password;
_clientId = clientId;
_remoteHost = remoteHost;
- _ssl = ssl;
+ _sslContext = sslContext;
_maxSessions = maxSessions;
}
@@ -121,7 +157,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
try
{
_conn = new org.apache.qpid.amqp_1_0.client.Connection(_protocol, _host,
- _port, _username, _password, container, _remoteHost, _ssl,
+ _port, _username, _password, container, _remoteHost, _sslContext,
_maxSessions - 1);
_conn.setConnectionErrorTask(new ConnectionErrorTask());
// TODO - retrieve negotiated AMQP version
@@ -190,6 +226,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
SessionImpl session = new SessionImpl(this, acknowledgeMode);
session.setQueueSession(_isQueueConnection);
session.setTopicSession(_isTopicConnection);
+ if(_maxPrefetch != 0)
+ {
+ session.setMaxPrefetch(_maxPrefetch);
+ }
boolean connectionStarted = false;
synchronized(_lock)