diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-01-21 02:45:35 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-01-21 02:45:35 +0000 |
commit | 6974c985d4bb9d6dd365b304916d25c44418403e (patch) | |
tree | 722cb4c974c494a92bc464369d7d1e2a41f70ed8 /qpid/java/common/src/main/java | |
parent | 40b4cc1b947f0b4665ddd0990ebb9368ad1052ca (diff) | |
download | qpid-python-6974c985d4bb9d6dd365b304916d25c44418403e.tar.gz |
The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352
- Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352.
- Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352.
- Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged.
- Modified the ClientDelegate to handle heartbeat and idelTimeout value properly.
- Added support to specify config options via the connection URL - QPID-2351
- Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@901506 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java')
4 files changed, 253 insertions, 42 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 4200a8352c..b12fbb75e6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -61,24 +61,13 @@ public class ClientDelegate extends ConnectionDelegate } catch (GSSException ignore) {} } - private String vhost; - private String username; - private String password; private List<String> clientMechs; - private String protocol; - private String serverName; + private ConnectionSettings conSettings; - public ClientDelegate(String vhost, String username, String password,String saslMechs) + public ClientDelegate(ConnectionSettings settings) { - this.vhost = vhost; - this.username = username; - this.password = password; - this.clientMechs = Arrays.asList(saslMechs.split(" ")); - - // Looks kinda of silly but the Sun SASL Kerberos client uses the - // protocol + servername as the service key. - this.protocol = System.getProperty("qpid.sasl_protocol","AMQP"); - this.serverName = System.getProperty("qpid.sasl_server_name","localhost"); + this.conSettings = settings; + this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" ")); } public void init(Connection conn, ProtocolHeader hdr) @@ -128,11 +117,16 @@ public class ClientDelegate extends ConnectionDelegate try { + Map<String,Object> saslProps = new HashMap<String,Object>(); + if (conSettings.isUseSASLEncryption()) + { + saslProps.put(Sasl.QOP, "auth-conf"); + } UsernamePasswordCallbackHandler handler = new UsernamePasswordCallbackHandler(); - handler.initialise(username, password); + handler.initialise(conSettings.getUsername(), conSettings.getPassword()); SaslClient sc = Sasl.createSaslClient - (mechs, null, protocol, serverName, null, handler); + (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler); conn.setSaslClient(sc); byte[] response = sc.hasInitialResponse() ? @@ -164,15 +158,16 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { conn.setChannelMax(tune.getChannelMax()); - int hb_interval = calculateHeartbeatInterval(conn, + int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() ); conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), hb_interval); - conn.setIdleTimeout(hb_interval*1000); - conn.connectionOpen(vhost, null, Option.INSIST); + // The idle timeout is twice the heartbeat amount (in milisecs) + conn.setIdleTimeout(hb_interval*1000*2); + conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST); } @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) @@ -198,9 +193,9 @@ public class ClientDelegate extends ConnectionDelegate /** * Currently the spec specified the min and max for heartbeat using secs */ - private int calculateHeartbeatInterval(Connection conn,int min, int max) + private int calculateHeartbeatInterval(int heartbeat,int min, int max) { - int i = conn.getIdleTimeout()/1000; + int i = heartbeat; if (i == 0) { log.warn("Idle timeout is zero. Heartbeats are disabled"); @@ -245,7 +240,7 @@ public class ClientDelegate extends ConnectionDelegate private String getUserID() { log.debug("Obtaining userID from kerberos"); - String service = protocol + "@" + serverName; + String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName(); GSSManager manager = GSSManager.getInstance(); try diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 9f1916e1d1..17a13561c8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -76,7 +76,7 @@ public class Connection extends ConnectionInvoker private State state = NEW; final private Object lock = new Object(); private long timeout = 60000; - private ConnectionListener listener = new DefaultConnectionListener(); + private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>(); private ConnectionException error = null; private int channelMax = 1; @@ -86,7 +86,8 @@ public class Connection extends ConnectionInvoker private int idleTimeout = 0; private String _authorizationID; private String userID; - + private ConnectionSettings conSettings; + // want to make this final private int _connectionId; @@ -97,16 +98,9 @@ public class Connection extends ConnectionInvoker this.delegate = delegate; } - public void setConnectionListener(ConnectionListener listener) + public void addConnectionListener(ConnectionListener listener) { - if (listener == null) - { - this.listener = new DefaultConnectionListener(); - } - else - { - this.listener = listener; - } + listeners.add(listener); } public Sender<ProtocolEvent> getSender() @@ -154,7 +148,7 @@ public class Connection extends ConnectionInvoker this.saslClient = saslClient; } - SaslClient getSaslClient() + public SaslClient getSaslClient() { return saslClient; } @@ -171,13 +165,30 @@ public class Connection extends ConnectionInvoker public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) { + ConnectionSettings settings = new ConnectionSettings(); + settings.setHost(host); + settings.setPort(port); + settings.setVhost(vhost); + settings.setUsername(username); + settings.setPassword(password); + settings.setUseSSL(ssl); + settings.setSaslMechs(saslMechs); + connect(settings); + } + + public void connect(ConnectionSettings settings) + { synchronized (lock) { + conSettings = settings; state = OPENING; - userID = username; - delegate = new ClientDelegate(vhost, username, password,saslMechs); + userID = settings.getUsername(); + delegate = new ClientDelegate(settings); - IoTransport.connect(host, port, ConnectionBinding.get(this), ssl); + IoTransport.connect(settings.getHost(), + settings.getPort(), + ConnectionBinding.get(this), + settings.isUseSSL()); send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -218,7 +229,10 @@ public class Connection extends ConnectionInvoker } } - listener.opened(this); + for (ConnectionListener listener: listeners) + { + listener.opened(this); + } } public Session createSession() @@ -407,7 +421,11 @@ public class Connection extends ConnectionInvoker } } - listener.exception(this, e); + for (ConnectionListener listener: listeners) + { + listener.exception(this, e); + } + } public void exception(Throwable t) @@ -460,7 +478,10 @@ public class Connection extends ConnectionInvoker setState(CLOSED); } - listener.closed(this); + for (ConnectionListener listener: listeners) + { + listener.closed(this); + } } public void close() @@ -560,5 +581,10 @@ public class Connection extends ConnectionInvoker { return String.format("conn:%x", System.identityHashCode(this)); } + + public ConnectionSettings getConnectionSettings() + { + return conSettings; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java new file mode 100644 index 0000000000..b25c2d7fd0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +public class ConnectionSettings +{ + String protocol = "tcp"; + String host = "localhost"; + String vhost; + String username = "guest"; + String password = "guest"; + String saslMechs = "PLAIN"; + String saslProtocol = "AMQP"; + String saslServerName = "localhost"; + int port = 5672; + int maxChannelCount = 32767; + int maxFrameSize = 65535; + int heartbeatInterval; + boolean useSSL; + boolean useSASLEncryption; + boolean tcpNodelay; + + public boolean isTcpNodelay() + { + return tcpNodelay; + } + + public void setTcpNodelay(boolean tcpNodelay) + { + this.tcpNodelay = tcpNodelay; + } + + public int getHeartbeatInterval() + { + return heartbeatInterval; + } + + public void setHeartbeatInterval(int heartbeatInterval) + { + this.heartbeatInterval = heartbeatInterval; + } + + public String getProtocol() + { + return protocol; + } + + public void setProtocol(String protocol) + { + this.protocol = protocol; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public void setPort(int port) + { + this.port = port; + } + + public String getVhost() + { + return vhost; + } + + public void setVhost(String vhost) + { + this.vhost = vhost; + } + + public String getUsername() + { + return username; + } + + public void setUsername(String username) + { + this.username = username; + } + + public String getPassword() + { + return password; + } + + public void setPassword(String password) + { + this.password = password; + } + + public boolean isUseSSL() + { + return useSSL; + } + + public void setUseSSL(boolean useSSL) + { + this.useSSL = useSSL; + } + + public boolean isUseSASLEncryption() + { + return useSASLEncryption; + } + + public void setUseSASLEncryption(boolean useSASLEncryption) + { + this.useSASLEncryption = useSASLEncryption; + } + + public String getSaslMechs() + { + return saslMechs; + } + + public void setSaslMechs(String saslMechs) + { + this.saslMechs = saslMechs; + } + + public String getSaslProtocol() + { + return saslProtocol; + } + + public void setSaslProtocol(String saslProtocol) + { + this.saslProtocol = saslProtocol; + } + + public String getSaslServerName() + { + return saslServerName; + } + + public void setSaslServerName(String saslServerName) + { + this.saslServerName = saslServerName; + } + + public int getMaxChannelCount() + { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) + { + this.maxChannelCount = maxChannelCount; + } + + public int getMaxFrameSize() + { + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) + { + this.maxFrameSize = maxFrameSize; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 30d7e52d33..383fd6131a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -297,7 +297,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { try { - socket.setSoTimeout(i*2); + socket.setSoTimeout(i); } catch (Exception e) { |