summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-01-21 02:45:35 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-01-21 02:45:35 +0000
commit6974c985d4bb9d6dd365b304916d25c44418403e (patch)
tree722cb4c974c494a92bc464369d7d1e2a41f70ed8 /qpid/java/common/src/main/java
parent40b4cc1b947f0b4665ddd0990ebb9368ad1052ca (diff)
downloadqpid-python-6974c985d4bb9d6dd365b304916d25c44418403e.tar.gz
The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352
- Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352. - Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352. - Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged. - Modified the ClientDelegate to handle heartbeat and idelTimeout value properly. - Added support to specify config options via the connection URL - QPID-2351 - Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@901506 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java41
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java62
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java190
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java2
4 files changed, 253 insertions, 42 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 4200a8352c..b12fbb75e6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -61,24 +61,13 @@ public class ClientDelegate extends ConnectionDelegate
} catch (GSSException ignore) {}
}
- private String vhost;
- private String username;
- private String password;
private List<String> clientMechs;
- private String protocol;
- private String serverName;
+ private ConnectionSettings conSettings;
- public ClientDelegate(String vhost, String username, String password,String saslMechs)
+ public ClientDelegate(ConnectionSettings settings)
{
- this.vhost = vhost;
- this.username = username;
- this.password = password;
- this.clientMechs = Arrays.asList(saslMechs.split(" "));
-
- // Looks kinda of silly but the Sun SASL Kerberos client uses the
- // protocol + servername as the service key.
- this.protocol = System.getProperty("qpid.sasl_protocol","AMQP");
- this.serverName = System.getProperty("qpid.sasl_server_name","localhost");
+ this.conSettings = settings;
+ this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -128,11 +117,16 @@ public class ClientDelegate extends ConnectionDelegate
try
{
+ Map<String,Object> saslProps = new HashMap<String,Object>();
+ if (conSettings.isUseSASLEncryption())
+ {
+ saslProps.put(Sasl.QOP, "auth-conf");
+ }
UsernamePasswordCallbackHandler handler =
new UsernamePasswordCallbackHandler();
- handler.initialise(username, password);
+ handler.initialise(conSettings.getUsername(), conSettings.getPassword());
SaslClient sc = Sasl.createSaslClient
- (mechs, null, protocol, serverName, null, handler);
+ (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
conn.setSaslClient(sc);
byte[] response = sc.hasInitialResponse() ?
@@ -164,15 +158,16 @@ public class ClientDelegate extends ConnectionDelegate
@Override public void connectionTune(Connection conn, ConnectionTune tune)
{
conn.setChannelMax(tune.getChannelMax());
- int hb_interval = calculateHeartbeatInterval(conn,
+ int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
conn.connectionTuneOk(tune.getChannelMax(),
tune.getMaxFrameSize(),
hb_interval);
- conn.setIdleTimeout(hb_interval*1000);
- conn.connectionOpen(vhost, null, Option.INSIST);
+ // The idle timeout is twice the heartbeat amount (in milisecs)
+ conn.setIdleTimeout(hb_interval*1000*2);
+ conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
}
@Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
@@ -198,9 +193,9 @@ public class ClientDelegate extends ConnectionDelegate
/**
* Currently the spec specified the min and max for heartbeat using secs
*/
- private int calculateHeartbeatInterval(Connection conn,int min, int max)
+ private int calculateHeartbeatInterval(int heartbeat,int min, int max)
{
- int i = conn.getIdleTimeout()/1000;
+ int i = heartbeat;
if (i == 0)
{
log.warn("Idle timeout is zero. Heartbeats are disabled");
@@ -245,7 +240,7 @@ public class ClientDelegate extends ConnectionDelegate
private String getUserID()
{
log.debug("Obtaining userID from kerberos");
- String service = protocol + "@" + serverName;
+ String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
GSSManager manager = GSSManager.getInstance();
try
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 9f1916e1d1..17a13561c8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -76,7 +76,7 @@ public class Connection extends ConnectionInvoker
private State state = NEW;
final private Object lock = new Object();
private long timeout = 60000;
- private ConnectionListener listener = new DefaultConnectionListener();
+ private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
private ConnectionException error = null;
private int channelMax = 1;
@@ -86,7 +86,8 @@ public class Connection extends ConnectionInvoker
private int idleTimeout = 0;
private String _authorizationID;
private String userID;
-
+ private ConnectionSettings conSettings;
+
// want to make this final
private int _connectionId;
@@ -97,16 +98,9 @@ public class Connection extends ConnectionInvoker
this.delegate = delegate;
}
- public void setConnectionListener(ConnectionListener listener)
+ public void addConnectionListener(ConnectionListener listener)
{
- if (listener == null)
- {
- this.listener = new DefaultConnectionListener();
- }
- else
- {
- this.listener = listener;
- }
+ listeners.add(listener);
}
public Sender<ProtocolEvent> getSender()
@@ -154,7 +148,7 @@ public class Connection extends ConnectionInvoker
this.saslClient = saslClient;
}
- SaslClient getSaslClient()
+ public SaslClient getSaslClient()
{
return saslClient;
}
@@ -171,13 +165,30 @@ public class Connection extends ConnectionInvoker
public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
{
+ ConnectionSettings settings = new ConnectionSettings();
+ settings.setHost(host);
+ settings.setPort(port);
+ settings.setVhost(vhost);
+ settings.setUsername(username);
+ settings.setPassword(password);
+ settings.setUseSSL(ssl);
+ settings.setSaslMechs(saslMechs);
+ connect(settings);
+ }
+
+ public void connect(ConnectionSettings settings)
+ {
synchronized (lock)
{
+ conSettings = settings;
state = OPENING;
- userID = username;
- delegate = new ClientDelegate(vhost, username, password,saslMechs);
+ userID = settings.getUsername();
+ delegate = new ClientDelegate(settings);
- IoTransport.connect(host, port, ConnectionBinding.get(this), ssl);
+ IoTransport.connect(settings.getHost(),
+ settings.getPort(),
+ ConnectionBinding.get(this),
+ settings.isUseSSL());
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -218,7 +229,10 @@ public class Connection extends ConnectionInvoker
}
}
- listener.opened(this);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.opened(this);
+ }
}
public Session createSession()
@@ -407,7 +421,11 @@ public class Connection extends ConnectionInvoker
}
}
- listener.exception(this, e);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.exception(this, e);
+ }
+
}
public void exception(Throwable t)
@@ -460,7 +478,10 @@ public class Connection extends ConnectionInvoker
setState(CLOSED);
}
- listener.closed(this);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.closed(this);
+ }
}
public void close()
@@ -560,5 +581,10 @@ public class Connection extends ConnectionInvoker
{
return String.format("conn:%x", System.identityHashCode(this));
}
+
+ public ConnectionSettings getConnectionSettings()
+ {
+ return conSettings;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
new file mode 100644
index 0000000000..b25c2d7fd0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public class ConnectionSettings
+{
+ String protocol = "tcp";
+ String host = "localhost";
+ String vhost;
+ String username = "guest";
+ String password = "guest";
+ String saslMechs = "PLAIN";
+ String saslProtocol = "AMQP";
+ String saslServerName = "localhost";
+ int port = 5672;
+ int maxChannelCount = 32767;
+ int maxFrameSize = 65535;
+ int heartbeatInterval;
+ boolean useSSL;
+ boolean useSASLEncryption;
+ boolean tcpNodelay;
+
+ public boolean isTcpNodelay()
+ {
+ return tcpNodelay;
+ }
+
+ public void setTcpNodelay(boolean tcpNodelay)
+ {
+ this.tcpNodelay = tcpNodelay;
+ }
+
+ public int getHeartbeatInterval()
+ {
+ return heartbeatInterval;
+ }
+
+ public void setHeartbeatInterval(int heartbeatInterval)
+ {
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ public String getProtocol()
+ {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol)
+ {
+ this.protocol = protocol;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ public String getVhost()
+ {
+ return vhost;
+ }
+
+ public void setVhost(String vhost)
+ {
+ this.vhost = vhost;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public void setUsername(String username)
+ {
+ this.username = username;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+
+ public void setPassword(String password)
+ {
+ this.password = password;
+ }
+
+ public boolean isUseSSL()
+ {
+ return useSSL;
+ }
+
+ public void setUseSSL(boolean useSSL)
+ {
+ this.useSSL = useSSL;
+ }
+
+ public boolean isUseSASLEncryption()
+ {
+ return useSASLEncryption;
+ }
+
+ public void setUseSASLEncryption(boolean useSASLEncryption)
+ {
+ this.useSASLEncryption = useSASLEncryption;
+ }
+
+ public String getSaslMechs()
+ {
+ return saslMechs;
+ }
+
+ public void setSaslMechs(String saslMechs)
+ {
+ this.saslMechs = saslMechs;
+ }
+
+ public String getSaslProtocol()
+ {
+ return saslProtocol;
+ }
+
+ public void setSaslProtocol(String saslProtocol)
+ {
+ this.saslProtocol = saslProtocol;
+ }
+
+ public String getSaslServerName()
+ {
+ return saslServerName;
+ }
+
+ public void setSaslServerName(String saslServerName)
+ {
+ this.saslServerName = saslServerName;
+ }
+
+ public int getMaxChannelCount()
+ {
+ return maxChannelCount;
+ }
+
+ public void setMaxChannelCount(int maxChannelCount)
+ {
+ this.maxChannelCount = maxChannelCount;
+ }
+
+ public int getMaxFrameSize()
+ {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize)
+ {
+ this.maxFrameSize = maxFrameSize;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 30d7e52d33..383fd6131a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -297,7 +297,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
{
try
{
- socket.setSoTimeout(i*2);
+ socket.setSoTimeout(i);
}
catch (Exception e)
{