summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java61
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java89
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java66
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java24
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java33
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java95
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java31
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java22
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java19
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java59
48 files changed, 457 insertions, 465 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
index 491a7ac218..e3e6f81f7c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.transport;
-import java.nio.ByteBuffer;
+import static org.apache.qpid.transport.util.Functions.str;
-import static org.apache.qpid.transport.util.Functions.*;
+import java.nio.ByteBuffer;
/**
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 9bdad6b00e..c75adab444 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.transport;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.util.Logger;
+
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.RESUMING;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import org.apache.qpid.transport.util.Logger;
-
/**
* ClientDelegate
@@ -46,11 +46,11 @@ public class ClientDelegate extends ConnectionDelegate
- protected final ConnectionSettings _conSettings;
+ private final ConnectionSettings _connectionSettings;
public ClientDelegate(ConnectionSettings settings)
{
- this._conSettings = settings;
+ _connectionSettings = settings;
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -66,15 +66,17 @@ public class ClientDelegate extends ConnectionDelegate
{
Map<String,Object> clientProperties = new HashMap<String,Object>();
- if(this._conSettings.getClientProperties() != null)
+ if(_connectionSettings.getClientProperties() != null)
{
- clientProperties.putAll(_conSettings.getClientProperties());
+ clientProperties.putAll(_connectionSettings.getClientProperties());
}
- clientProperties.put("qpid.session_flow", 1);
- clientProperties.put("qpid.client_pid",getPID());
- clientProperties.put("qpid.client_process",
- System.getProperty("qpid.client_process","Qpid Java Client"));
+ clientProperties.put(ConnectionStartProperties.SESSION_FLOW, 1);
+ clientProperties.put(ConnectionStartProperties.PID, ConnectionStartProperties.getPID());
+ clientProperties.put(ConnectionStartProperties.PROCESS, System.getProperty(ClientProperties.PROCESS_NAME, "Qpid Java Client"));
+ clientProperties.put(ConnectionStartProperties.VERSION_0_10, QpidProperties.getReleaseVersion());
+ clientProperties.put(ConnectionStartProperties.PRODUCT, QpidProperties.getProductName());
+ clientProperties.put(ConnectionStartProperties.PLATFORM, ConnectionStartProperties.getPlatformInfo());
List<Object> brokerMechs = start.getMechanisms();
if (brokerMechs == null || brokerMechs.isEmpty())
@@ -131,7 +133,7 @@ public class ClientDelegate extends ConnectionDelegate
@Override
public void connectionTune(Connection conn, ConnectionTune tune)
{
- int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(),
+ int hb_interval = calculateHeartbeatInterval(_connectionSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
@@ -146,7 +148,7 @@ public class ClientDelegate extends ConnectionDelegate
//(or that forced by protocol limitations [0xFFFF])
conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
- conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST);
+ conn.connectionOpen(_connectionSettings.getVhost(), null, Option.INSIST);
}
@Override
@@ -197,31 +199,8 @@ public class ClientDelegate extends ConnectionDelegate
}
}
- private int getPID()
+ public ConnectionSettings getConnectionSettings()
{
- RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean();
- String processName = rtb.getName();
- if (processName != null && processName.indexOf('@')>0)
- {
- try
- {
- return Integer.parseInt(processName.substring(0,processName.indexOf('@')));
- }
- catch(Exception e)
- {
- log.warn("Unable to get the client PID due to error",e);
- return -1;
- }
- }
- else
- {
- log.warn("Unable to get the client PID due to unsupported format : " + processName);
- return -1;
- }
-
+ return _connectionSettings;
}
-
-
-
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index dee5f696b9..b0f1a1bad8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -20,12 +20,27 @@
*/
package org.apache.qpid.transport;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
+
import static org.apache.qpid.transport.Connection.State.CLOSED;
import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,22 +51,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
-
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-import org.apache.qpid.transport.network.security.SecurityLayer;
-import org.apache.qpid.transport.network.security.SecurityLayerFactory;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
-import org.apache.qpid.util.Strings;
-
/**
* Connection
@@ -125,7 +124,6 @@ public class Connection extends ConnectionInvoker
private String userID;
private ConnectionSettings conSettings;
private SecurityLayer securityLayer;
- private String _clientId;
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
@@ -161,16 +159,6 @@ public class Connection extends ConnectionInvoker
}
}
- public String getClientId()
- {
- return _clientId;
- }
-
- public void setClientId(String id)
- {
- _clientId = id;
- }
-
void setLocale(String locale)
{
this.locale = locale;
@@ -201,23 +189,12 @@ public class Connection extends ConnectionInvoker
return saslClient;
}
- public void connect(String host, int port, String vhost, String username, String password)
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs)
{
- connect(host, port, vhost, username, password, false);
+ connect(host, port, vhost, username, password, ssl, saslMechs, null);
}
- public void connect(String host, int port, String vhost, String username, String password, boolean ssl)
- {
- connect(host, port, vhost, username, password, ssl,"PLAIN");
- }
-
- public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
- {
- connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP);
- }
-
-
- public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map<String,Object> clientProps)
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, Map<String,Object> clientProps)
{
ConnectionSettings settings = new ConnectionSettings();
settings.setHost(host);
@@ -535,7 +512,7 @@ public class Connection extends ConnectionInvoker
exception(new ConnectionException(t));
}
- void closeCode(ConnectionClose close)
+ public void closeCode(ConnectionClose close)
{
synchronized (lock)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 393301659d..fdd35d49ef 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -22,7 +22,7 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.util.Logger;
-import static org.apache.qpid.transport.Connection.State.*;
+import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
/**
@@ -71,12 +71,17 @@ public abstract class ConnectionDelegate
@Override public void connectionClose(Connection conn, ConnectionClose close)
{
- conn.connectionCloseOk();
- conn.getSender().close();
+ sendConnectionCloseOkAndCloseSender(conn);
conn.closeCode(close);
conn.setState(CLOSE_RCVD);
}
+ protected void sendConnectionCloseOkAndCloseSender(Connection conn)
+ {
+ conn.connectionCloseOk();
+ conn.getSender().close();
+ }
+
@Override public void connectionCloseOk(Connection conn, ConnectionCloseOk ok)
{
conn.getSender().close();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
index 2ee507e2ec..084428d182 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -20,9 +20,24 @@
*/
package org.apache.qpid.transport;
+import static org.apache.qpid.configuration.ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_TRUST_MANAGER_FACTORY_ALGORITHM_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_TRUST_STORE_CERT_TYPE_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_TCP_NODELAY_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.RECEIVE_BUFFER_SIZE_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.SEND_BUFFER_SIZE_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.LEGACY_SEND_BUFFER_SIZE_PROP_NAME;
+
import java.util.Map;
-import org.apache.qpid.configuration.ClientProperties;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.qpid.configuration.QpidProperty;
+
/**
* A ConnectionSettings object can only be associated with
@@ -34,37 +49,36 @@ public class ConnectionSettings
{
public static final String WILDCARD_ADDRESS = "*";
- String protocol = "tcp";
- String host = "localhost";
- String vhost;
- String username = "guest";
- String password = "guest";
- int port = 5672;
- boolean tcpNodelay = Boolean.valueOf(System.getProperty(ClientProperties.QPID_TCP_NODELAY_PROP_NAME,
- System.getProperty(ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME, "true")));
- int maxChannelCount = 32767;
- int maxFrameSize = 65535;
- int heartbeatInterval;
- int readBufferSize = 65535;
- int writeBufferSize = 65535;
- long transportTimeout = 60000;
-
+ private String protocol = "tcp";
+ private String host = "localhost";
+ private String vhost;
+ private String username = "guest";
+ private String password = "guest";
+ private int port = 5672;
+ private boolean tcpNodelay = QpidProperty.booleanProperty(Boolean.TRUE, QPID_TCP_NODELAY_PROP_NAME, AMQJ_TCP_NODELAY_PROP_NAME).get();
+ private int maxChannelCount = 32767;
+ private int maxFrameSize = 65535;
+ private int heartbeatInterval;
+ private int readBufferSize = QpidProperty.intProperty(65535, RECEIVE_BUFFER_SIZE_PROP_NAME, LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME).get();
+ private int writeBufferSize = QpidProperty.intProperty(65535, SEND_BUFFER_SIZE_PROP_NAME, LEGACY_SEND_BUFFER_SIZE_PROP_NAME).get();;
+ private long transportTimeout = 60000;
+
// SSL props
- boolean useSSL;
- String keyStorePath = System.getProperty("javax.net.ssl.keyStore");
- String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword");
- String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");;
- String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");;
- String trustStorePath = System.getProperty("javax.net.ssl.trustStore");;
- String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");;
- String certAlias;
- boolean verifyHostname;
+ private boolean useSSL;
+ private String keyStorePath = System.getProperty("javax.net.ssl.keyStore");
+ private String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword");
+ private String keyManagerFactoryAlgorithm = QpidProperty.stringProperty(KeyManagerFactory.getDefaultAlgorithm(), QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME, QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME).get();
+ private String trustManagerFactoryAlgorithm = QpidProperty.stringProperty(TrustManagerFactory.getDefaultAlgorithm(), QPID_SSL_TRUST_MANAGER_FACTORY_ALGORITHM_PROP_NAME, QPID_SSL_TRUST_STORE_CERT_TYPE_PROP_NAME).get();
+ private String trustStorePath = System.getProperty("javax.net.ssl.trustStore");;
+ private String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");;
+ private String certAlias;
+ private boolean verifyHostname;
// SASL props
- String saslMechs = System.getProperty("qpid.sasl_mechs", null);
- String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
- String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
- boolean useSASLEncryption;
+ private String saslMechs = System.getProperty("qpid.sasl_mechs", null);
+ private String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
+ private String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
+ private boolean useSASLEncryption;
private Map<String, Object> _clientProperties;
@@ -288,24 +302,24 @@ public class ConnectionSettings
this.verifyHostname = verifyHostname;
}
- public String getKeyStoreCertType()
+ public String getKeyManagerFactoryAlgorithm()
{
- return keyStoreCertType;
+ return keyManagerFactoryAlgorithm;
}
- public void setKeyStoreCertType(String keyStoreCertType)
+ public void setKeyManagerFactoryAlgorithm(String keyManagerFactoryAlgorithm)
{
- this.keyStoreCertType = keyStoreCertType;
+ this.keyManagerFactoryAlgorithm = keyManagerFactoryAlgorithm;
}
- public String getTrustStoreCertType()
+ public String getTrustManagerFactoryAlgorithm()
{
- return trustStoreCertType;
+ return trustManagerFactoryAlgorithm;
}
- public void setTrustStoreCertType(String trustStoreCertType)
+ public void setTrustManagerFactoryAlgorithm(String trustManagerFactoryAlgorithm)
{
- this.trustStoreCertType = trustStoreCertType;
+ this.trustManagerFactoryAlgorithm = trustManagerFactoryAlgorithm;
}
public int getReadBufferSize()
@@ -337,5 +351,4 @@ public class ConnectionSettings
{
this.transportTimeout = transportTimeout;
}
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
index 543856ca39..7ac75f9163 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.transport;
-import java.util.*;
+import java.util.List;
/**
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index abf96823cc..b3af99ea9d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -22,9 +22,9 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.network.Frame;
-import java.nio.ByteBuffer;
+import static org.apache.qpid.transport.util.Functions.str;
-import static org.apache.qpid.transport.util.Functions.*;
+import java.nio.ByteBuffer;
/**
* Method
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
index 8d3f7a779a..472beb6bb1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.transport;
+import java.net.InetSocketAddress;
+
/**
* This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing
* buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned
@@ -43,4 +45,6 @@ public interface NetworkTransportConfiguration
String getTransport();
Integer getConnectorProcessors();
+
+ InetSocketAddress getAddress();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
deleted file mode 100644
index 68fbb5e8ec..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.transport;
-
-import java.io.IOException;
-
-public class OpenException extends IOException
-{
-
- public OpenException(String string, Throwable lastException)
- {
- super(string, lastException);
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
index e5b93e40a9..3959dc8d95 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.transport;
-import java.nio.ByteBuffer;
-
+import org.apache.qpid.transport.network.Frame;
import org.apache.qpid.transport.network.NetworkDelegate;
import org.apache.qpid.transport.network.NetworkEvent;
-import org.apache.qpid.transport.network.Frame;
+
+import java.nio.ByteBuffer;
/**
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
index c47171dc4b..413ec8e8fd 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
@@ -20,13 +20,16 @@
*/
package org.apache.qpid.transport;
+import static org.apache.qpid.util.Serial.gt;
+import static org.apache.qpid.util.Serial.le;
+import static org.apache.qpid.util.Serial.max;
+import static org.apache.qpid.util.Serial.min;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import static org.apache.qpid.util.Serial.*;
-
/**
* Range
@@ -119,6 +122,11 @@ public abstract class Range implements RangeSet
throw new UnsupportedOperationException();
}
+ public void subtract(RangeSet rangeSet)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public RangeSet copy()
{
RangeSet rangeSet = RangeSetFactory.createRangeSet();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
index 34ebd02777..19990a4610 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.transport;
-import java.util.*;
-
-import static org.apache.qpid.util.Serial.*;
+import java.util.Iterator;
/**
* RangeSet
@@ -51,6 +49,8 @@ public interface RangeSet extends Iterable<Range>
void add(int value);
+ void subtract(final RangeSet other);
+
void clear();
RangeSet copy();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
index 0f19d7e2b2..0f049aba8e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
@@ -22,6 +22,10 @@ package org.apache.qpid.transport;
public class RangeSetFactory
{
+ private RangeSetFactory()
+ {
+ }
+
public static RangeSet createRangeSet()
{
return new RangeSetImpl();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
index f2540afb40..adf18e2920 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.transport;
+import static org.apache.qpid.util.Serial.lt;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
-import static org.apache.qpid.util.Serial.lt;
-
public class RangeSetImpl implements RangeSet
{
@@ -150,6 +150,68 @@ public class RangeSetImpl implements RangeSet
ranges.clear();
}
+ public void subtract(final RangeSet other)
+ {
+ final Iterator<Range> otherIter = other.iterator() ;
+ if (otherIter.hasNext())
+ {
+ Range otherRange = otherIter.next();
+ final ListIterator<Range> iter = ranges.listIterator() ;
+ if (iter.hasNext())
+ {
+ Range range = iter.next();
+ do
+ {
+ if (otherRange.getUpper() < range.getLower())
+ {
+ otherRange = nextRange(otherIter) ;
+ }
+ else if (range.getUpper() < otherRange.getLower())
+ {
+ range = nextRange(iter) ;
+ }
+ else
+ {
+ final boolean first = range.getLower() < otherRange.getLower() ;
+ final boolean second = otherRange.getUpper() < range.getUpper() ;
+
+ if (first)
+ {
+ iter.set(Range.newInstance(range.getLower(), otherRange.getLower()-1)) ;
+ if (second)
+ {
+ iter.add(Range.newInstance(otherRange.getUpper()+1, range.getUpper())) ;
+ iter.previous() ;
+ range = iter.next() ;
+ }
+ else
+ {
+ range = nextRange(iter) ;
+ }
+ }
+ else if (second)
+ {
+ range = Range.newInstance(otherRange.getUpper()+1, range.getUpper()) ;
+ iter.set(range) ;
+ otherRange = nextRange(otherIter) ;
+ }
+ else
+ {
+ iter.remove() ;
+ range = nextRange(iter) ;
+ }
+ }
+ }
+ while ((otherRange != null) && (range != null)) ;
+ }
+ }
+ }
+
+ private Range nextRange(final Iterator<Range> iter)
+ {
+ return (iter.hasNext() ? iter.next() : null) ;
+ }
+
public RangeSet copy()
{
return new org.apache.qpid.transport.RangeSetImpl(this);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 07d21c9904..ec409d1c72 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.transport;
-import static org.apache.qpid.transport.Connection.State.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import static org.apache.qpid.transport.Connection.State.OPEN;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
/**
* ServerDelegate
@@ -70,9 +69,6 @@ public class ServerDelegate extends ConnectionDelegate
conn.setLocale(ok.getLocale());
String mechanism = ok.getMechanism();
- String clientName = (String) ok.getClientProperties().get("clientName");
- conn.setClientId(clientName);
-
if (mechanism == null || mechanism.length() == 0)
{
tuneAuthorizedConnection(conn);
@@ -195,17 +191,11 @@ public class ServerDelegate extends ConnectionDelegate
@Override
public void sessionAttach(Connection conn, SessionAttach atc)
{
- sessionAttachImpl(conn, atc);
- }
-
- protected Session sessionAttachImpl(Connection conn, SessionAttach atc)
- {
Session ssn = getSession(conn, atc);
conn.map(ssn, atc.getChannel());
+ conn.registerSession(ssn);
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
-
- return ssn;
}
protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 3fc596d0eb..110c73f718 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -21,6 +21,11 @@
package org.apache.qpid.transport;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
+
import static org.apache.qpid.transport.Option.COMPLETED;
import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.TIMELY_REPLY;
@@ -30,11 +35,6 @@ import static org.apache.qpid.transport.Session.State.DETACHED;
import static org.apache.qpid.transport.Session.State.NEW;
import static org.apache.qpid.transport.Session.State.OPEN;
import static org.apache.qpid.transport.Session.State.RESUMING;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.transport.network.Frame;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
import static org.apache.qpid.util.Serial.ge;
import static org.apache.qpid.util.Serial.gt;
import static org.apache.qpid.util.Serial.le;
@@ -161,7 +161,7 @@ public class Session extends SessionInvoker
this.expiry = expiry;
}
- void setClose(boolean close)
+ protected void setClose(boolean close)
{
this.closing = close;
}
@@ -410,7 +410,6 @@ public class Session extends SessionInvoker
log.debug("ID: [%s] %s", this.channel, id);
}
- //if ((id % 65536) == 0)
if ((id & 0xff) == 0)
{
flushProcessed(TIMELY_REPLY);
@@ -514,20 +513,12 @@ public class Session extends SessionInvoker
void knownComplete(RangeSet kc)
{
- synchronized (processedLock)
+ if (kc.size() > 0)
{
- RangeSet newProcessed = RangeSetFactory.createRangeSet();
- for (Range pr : processed)
+ synchronized (processedLock)
{
- for (Range kr : kc)
- {
- for (Range r : pr.subtract(kr))
- {
- newProcessed.add(r);
- }
- }
+ processed.subtract(kc) ;
}
- this.processed = newProcessed;
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
index 64f9039484..01fe05c851 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.transport;
-import java.util.Collections;
-
-
/**
* SessionClosedException
*
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
index c4fc9558a1..6095b892f8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.transport;
-import java.util.List;
-
/**
* SessionException
*
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java
index 22bd9f34ad..9b703a3117 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.transport;
-import java.util.List;
-import java.util.Map;
-
import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encodable;
import org.apache.qpid.transport.codec.Encoder;
+import java.util.Map;
+
/**
* Struct
@@ -42,7 +41,7 @@ public abstract class Struct implements Encodable
return StructFactory.create(type);
}
- boolean dirty = true;
+ private boolean dirty = true;
public boolean isDirty()
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
index 6ff3b21400..27fce6e167 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
@@ -20,8 +20,14 @@
*/
package org.apache.qpid.transport.codec;
-import java.io.UnsupportedEncodingException;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -29,8 +35,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.transport.*;
-
/**
* AbstractDecoder
@@ -357,13 +361,13 @@ abstract class AbstractDecoder implements Decoder
private long readSize(Type t)
{
- if (t.fixed)
+ if (t.isFixed())
{
- return t.width;
+ return t.getWidth();
}
else
{
- return readSize(t.width);
+ return readSize(t.getWidth());
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index 0ccfcfcb70..2b93697bfc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -20,24 +20,22 @@
*/
package org.apache.qpid.transport.codec;
-import java.io.UnsupportedEncodingException;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.Xid;
+import static org.apache.qpid.transport.util.Functions.lsb;
-import java.util.Collections;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import static org.apache.qpid.transport.util.Functions.*;
-
/**
* AbstractEncoder
@@ -64,6 +62,7 @@ abstract class AbstractEncoder implements Encoder
ENCODINGS.put(Character.class, Type.CHAR);
ENCODINGS.put(byte[].class, Type.VBIN32);
ENCODINGS.put(UUID.class, Type.UUID);
+ ENCODINGS.put(Xid.class, Type.STRUCT32);
}
private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
@@ -362,7 +361,7 @@ abstract class AbstractEncoder implements Encoder
Object value = entry.getValue();
Type type = encoding(value);
writeStr8(key);
- put(type.code);
+ put(type.getCode());
write(type, value);
}
}
@@ -383,7 +382,7 @@ abstract class AbstractEncoder implements Encoder
for (Object value : list)
{
Type type = encoding(value);
- put(type.code);
+ put(type.getCode());
write(type, value);
}
}
@@ -411,7 +410,7 @@ abstract class AbstractEncoder implements Encoder
type = encoding(array.get(0));
}
- put(type.code);
+ put(type.getCode());
writeUint32(array.size());
@@ -423,18 +422,18 @@ abstract class AbstractEncoder implements Encoder
private void writeSize(Type t, int size)
{
- if (t.fixed)
+ if (t.isFixed())
{
- if (size != t.width)
+ if (size != t.getWidth())
{
throw new IllegalArgumentException
- ("size does not match fixed width " + t.width + ": " +
+ ("size does not match fixed width " + t.getWidth() + ": " +
size);
}
}
else
{
- writeSize(t.width, size);
+ writeSize(t.getWidth(), size);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
index 10f67e1cd6..a8ac44200f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.transport.codec;
+import org.apache.qpid.transport.Binary;
+
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import org.apache.qpid.transport.Binary;
-
/**
* Byte Buffer Decoder.
* Decoder concrete implementor using a backing byte buffer for decoding data.
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java
index a4df5b5fcb..fb3f91a3ce 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.transport.codec;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
/**
* Decoder interface.
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
index 7d4f02af31..5a3cec5616 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.transport.codec;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
/**
* Encoder interface.
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 8cd5c29f6d..a80b988cea 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,15 +20,23 @@
*/
package org.apache.qpid.transport.network;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.codec.BBDecoder;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.codec.BBDecoder;
-
/**
* Assembler
*
@@ -181,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command = Method.create(commandType);
command.setSync((0x0001 & hdr) != 0);
command.read(dec);
- if (command.hasPayload())
+ if (command.hasPayload() && !frame.isLastSegment())
{
setIncompleteCommand(channel, command);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 1a8d277bba..5a5de597c2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
@@ -31,6 +29,8 @@ import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
+import java.nio.ByteBuffer;
+
/**
* ConnectionBinding
*
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 6ac9df9bc3..fe437ecf93 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -30,16 +30,18 @@ import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
+
import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
import static org.apache.qpid.transport.network.Frame.LAST_SEG;
-import static java.lang.Math.min;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import static java.lang.Math.min;
+
/**
* Disassembler
*/
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
index 849355276e..9416c4c9fa 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
@@ -21,15 +21,10 @@
package org.apache.qpid.transport.network;
import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.util.SliceIterator;
-import java.nio.ByteBuffer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
+import static org.apache.qpid.transport.util.Functions.str;
-import static org.apache.qpid.transport.util.Functions.*;
+import java.nio.ByteBuffer;
/**
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
index b371df639e..4d4274278f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.transport.network;
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
+import javax.net.ssl.SSLContext;
+
public interface IncomingNetworkTransport extends NetworkTransport
{
public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index a2885f97bc..86e05db818 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -20,17 +20,19 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.SegmentType;
-import static org.apache.qpid.transport.util.Functions.*;
+import static org.apache.qpid.transport.network.InputHandler.State.ERROR;
+import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY;
+import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR;
+import static org.apache.qpid.transport.network.InputHandler.State.PROTO_HDR;
+import static org.apache.qpid.transport.util.Functions.str;
-import static org.apache.qpid.transport.network.InputHandler.State.*;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
/**
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 7384702525..2cc7c14f00 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.transport.network;
+import org.apache.qpid.transport.Sender;
+
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Sender;
-
public interface NetworkConnection
{
Sender<ByteBuffer> getSender();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
index c3c248761c..0ebde483cf 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
@@ -20,16 +20,13 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
+import javax.net.ssl.SSLContext;
+import java.nio.ByteBuffer;
+
public interface OutgoingNetworkTransport extends NetworkTransport
{
- public NetworkConnection getConnection();
-
public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index da4349ba86..55ba95ad75 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.transport.network;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.TransportException;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.TransportException;
-
public class Transport
{
public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport";
@@ -54,6 +54,10 @@ public class Transport
OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
}
+ private Transport()
+ {
+ }
+
public static IncomingNetworkTransport getIncomingTransportInstance()
{
return (IncomingNetworkTransport) loadTransportClass(
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index bfc77539ce..4046691779 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -20,15 +20,16 @@
*/
package org.apache.qpid.transport.network.io;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
public class IoNetworkConnection implements NetworkConnection
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 838a662402..42c8334a5d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -21,7 +21,11 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
@@ -29,16 +33,18 @@ import javax.net.ssl.SSLServerSocketFactory;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.util.Logger;
+import org.slf4j.LoggerFactory;
public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
-
- private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
private Socket _socket;
private IoNetworkConnection _connection;
@@ -58,10 +64,13 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
_socket.setSendBufferSize(sendBufferSize);
_socket.setReceiveBufferSize(receiveBufferSize);
- LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
- LOGGER.debug("TCP_NODELAY : %s", _socket.getTcpNoDelay());
-
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
+ LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+ }
+
InetAddress address = InetAddress.getByName(settings.getHost());
_socket.connect(new InetSocketAddress(address, settings.getPort()));
@@ -120,7 +129,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
try
{
_acceptor = new AcceptingThread(config, factory, sslContext);
-
+ _acceptor.setDaemon(false);
_acceptor.start();
}
catch (IOException e)
@@ -133,9 +142,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
private class AcceptingThread extends Thread
{
+ private volatile boolean _closed = false;
private NetworkTransportConfiguration _config;
private ProtocolEngineFactory _factory;
- private SSLContext _sslContent;
+ private SSLContext _sslContext;
private ServerSocket _serverSocket;
private AcceptingThread(NetworkTransportConfiguration config,
@@ -145,9 +155,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
_config = config;
_factory = factory;
- _sslContent = sslContext;
+ _sslContext = sslContext;
- InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
+ InetSocketAddress address = config.getAddress();
if(sslContext == null)
{
@@ -155,12 +165,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
}
else
{
- SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
+ SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
_serverSocket = socketFactory.createServerSocket();
}
- _serverSocket.bind(address);
_serverSocket.setReuseAddress(true);
+ _serverSocket.bind(address);
}
@@ -171,6 +181,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
*/
public void close()
{
+ LOGGER.debug("Shutting down the Acceptor");
+ _closed = true;
+
if (!_serverSocket.isClosed())
{
try
@@ -189,11 +202,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
try
{
- while (true)
+ while (!_closed)
{
+ Socket socket = null;
try
{
- Socket socket = _serverSocket.accept();
+ socket = _serverSocket.accept();
socket.setTcpNoDelay(_config.getTcpNoDelay());
final Integer sendBufferSize = _config.getSendBufferSize();
@@ -206,27 +220,58 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
-
engine.setNetworkConnection(connection, connection.getSender());
connection.start();
-
-
}
catch(RuntimeException e)
{
- LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
+ LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e);
+ closeSocketIfNecessary(socket);
+ }
+ catch(IOException e)
+ {
+ if(!_closed)
+ {
+ LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e);
+ closeSocketIfNecessary(socket);
+ try
+ {
+ //Delay to avoid tight spinning the loop during issues such as too many open files
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.debug("Stopping acceptor due to interrupt request");
+ _closed = true;
+ }
+ }
}
}
}
- catch (IOException e)
+ finally
{
- LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
- + _config.getPort());
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on port " + _config.getPort());
+ }
}
}
-
+ private void closeSocketIfNecessary(final Socket socket)
+ {
+ if(socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Exception while closing socket", e);
+ }
+ }
+ }
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 5b714434d9..7e63071c16 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -26,6 +26,7 @@ import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
+import javax.net.ssl.SSLSocket;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
@@ -33,8 +34,6 @@ import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLSocket;
-
/**
* IoReceiver
*
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 427487c879..a58fea47d2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -18,6 +18,14 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.common.Closeable;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
import static org.apache.qpid.transport.util.Functions.mod;
import java.io.IOException;
@@ -28,14 +36,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderClosedException;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.util.Logger;
-
public final class IoSender implements Runnable, Sender<ByteBuffer>
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 9fd65c6e51..51ef266ee9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -20,22 +20,10 @@
*/
package org.apache.qpid.transport.network.security;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
-import org.apache.qpid.transport.network.security.sasl.SASLSender;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLSender;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+import java.nio.ByteBuffer;
public interface SecurityLayer
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
index 08934004a8..442800c529 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
@@ -21,7 +21,10 @@
package org.apache.qpid.transport.network.security;
import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
@@ -34,6 +37,10 @@ import java.nio.ByteBuffer;
public class SecurityLayerFactory
{
+ private SecurityLayerFactory()
+ {
+ }
+
public static SecurityLayer newInstance(ConnectionSettings settings)
{
@@ -71,10 +78,10 @@ public class SecurityLayerFactory
sslCtx = SSLContextFactory
.buildClientContext(settings.getTrustStorePath(),
settings.getTrustStorePassword(),
- settings.getTrustStoreCertType(),
+ settings.getTrustManagerFactoryAlgorithm(),
settings.getKeyStorePath(),
settings.getKeyStorePassword(),
- settings.getKeyStoreCertType(),
+ settings.getKeyManagerFactoryAlgorithm(),
settings.getCertAlias());
}
catch (Exception e)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
index 7964239e31..625e1a77c2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
@@ -21,21 +21,19 @@ package org.apache.qpid.transport.network.security.sasl;
*/
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
public abstract class SASLEncryptor implements ConnectionListener
{
- protected SaslClient saslClient;
- protected boolean securityLayerEstablished = false;
- protected int sendBuffSize;
- protected int recvBuffSize;
+ private SaslClient saslClient;
+ private boolean securityLayerEstablished = false;
+ private int sendBuffSize;
+ private int recvBuffSize;
public boolean isSecurityLayerEstablished()
{
@@ -63,4 +61,19 @@ public abstract class SASLEncryptor implements ConnectionListener
public void closed(Connection conn) {}
public abstract void securityLayerEstablished();
+
+ public SaslClient getSaslClient()
+ {
+ return saslClient;
+ }
+
+ public int getSendBuffSize()
+ {
+ return sendBuffSize;
+ }
+
+ public int getRecvBuffSize()
+ {
+ return recvBuffSize;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
index 86106318ef..a100b96412 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
@@ -21,18 +21,16 @@ package org.apache.qpid.transport.network.security.sasl;
*/
-import java.nio.ByteBuffer;
-
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.util.Logger;
+import javax.security.sasl.SaslException;
+import java.nio.ByteBuffer;
+
public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> {
- Receiver<ByteBuffer> delegate;
+ private Receiver<ByteBuffer> delegate;
private byte[] netData;
private static final Logger log = Logger.get(SASLReceiver.class);
@@ -58,11 +56,11 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer>
{
while (buf.hasRemaining())
{
- int length = Math.min(buf.remaining(),recvBuffSize);
+ int length = Math.min(buf.remaining(), getRecvBuffSize());
buf.get(netData, 0, length);
try
{
- byte[] out = saslClient.unwrap(netData, 0, length);
+ byte[] out = getSaslClient().unwrap(netData, 0, length);
delegate.received(ByteBuffer.wrap(out));
}
catch (SaslException e)
@@ -79,7 +77,7 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer>
public void securityLayerEstablished()
{
- netData = new byte[recvBuffSize];
+ netData = new byte[getRecvBuffSize()];
log.debug("SASL Security Layer Established");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index 2d9e4e9a7e..61d54a8386 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -21,19 +21,17 @@ package org.apache.qpid.transport.network.security.sasl;
*/
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.util.Logger;
+import javax.security.sasl.SaslException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
- protected Sender<ByteBuffer> delegate;
+ private Sender<ByteBuffer> delegate;
private byte[] appData;
private final AtomicBoolean closed = new AtomicBoolean(false);
private static final Logger log = Logger.get(SASLSender.class);
@@ -54,7 +52,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
{
try
{
- saslClient.dispose();
+ getSaslClient().dispose();
}
catch (SaslException e)
{
@@ -80,14 +78,14 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
{
while (buf.hasRemaining())
{
- int length = Math.min(buf.remaining(),sendBuffSize);
- log.debug("sendBuffSize %s", sendBuffSize);
+ int length = Math.min(buf.remaining(), getSendBuffSize());
+ log.debug("sendBuffSize %s", getSendBuffSize());
log.debug("buf.remaining() %s", buf.remaining());
buf.get(appData, 0, length);
try
{
- byte[] out = saslClient.wrap(appData, 0, length);
+ byte[] out = getSaslClient().wrap(appData, 0, length);
log.debug("out.length %s", out.length);
delegate.send(ByteBuffer.wrap(out));
@@ -112,7 +110,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
public void securityLayerEstablished()
{
- appData = new byte[sendBuffSize];
+ appData = new byte[getSendBuffSize()];
log.debug("SASL Security Layer Established");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
index 4391e8adfc..3ab028c8a8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.transport.network.security.ssl;
+import org.apache.qpid.transport.util.Logger;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.X509ExtendedKeyManager;
import java.io.IOException;
import java.net.Socket;
import java.security.GeneralSecurityException;
@@ -28,25 +33,19 @@ import java.security.Principal;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.X509ExtendedKeyManager;
-
-import org.apache.qpid.transport.util.Logger;
-
public class QpidClientX509KeyManager extends X509ExtendedKeyManager
{
private static final Logger log = Logger.get(QpidClientX509KeyManager.class);
- X509ExtendedKeyManager delegate;
- String alias;
+ private X509ExtendedKeyManager delegate;
+ private String alias;
public QpidClientX509KeyManager(String alias, String keyStorePath,
- String keyStorePassword,String keyStoreCertType) throws GeneralSecurityException, IOException
+ String keyStorePassword, String keyManagerFactoryAlgorithmName) throws GeneralSecurityException, IOException
{
this.alias = alias;
KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword);
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyStoreCertType);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithmName);
kmf.init(ks, keyStorePassword.toCharArray());
this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index 8ad40bbfd3..13a16d07b5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -20,19 +20,17 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
public class SSLReceiver implements Receiver<ByteBuffer>
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 6f5aa6d86e..88943695d4 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -19,20 +19,18 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
-
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
+import javax.net.ssl.SSLException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
public class SSLSender implements Sender<ByteBuffer>
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
index 6bb038a581..71a73db71f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.transport.network.security.ssl;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -30,19 +35,14 @@ import java.security.Principal;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
-
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.util.Logger;
-
public class SSLUtil
{
private static final Logger log = Logger.get(SSLUtil.class);
-
+
+ private SSLUtil()
+ {
+ }
+
public static void verifyHostname(SSLEngine engine,String hostnameExpected)
{
try
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
index 5761228642..bd3e9bbcbc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
@@ -22,7 +22,7 @@ package org.apache.qpid.transport.util;
import java.nio.ByteBuffer;
-import static java.lang.Math.*;
+import static java.lang.Math.min;
/**
@@ -31,8 +31,11 @@ import static java.lang.Math.*;
* @author Rafael H. Schloming
*/
-public class Functions
+public final class Functions
{
+ private Functions()
+ {
+ }
public static final int mod(int n, int m)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java
deleted file mode 100644
index 3db29847b2..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.util;
-
-import java.nio.ByteBuffer;
-
-import java.util.Iterator;
-
-
-/**
- * SliceIterator
- *
- * @author Rafael H. Schloming
- */
-
-public class SliceIterator implements Iterator<ByteBuffer>
-{
-
- final private Iterator<ByteBuffer> iterator;
-
- public SliceIterator(Iterator<ByteBuffer> iterator)
- {
- this.iterator = iterator;
- }
-
- public boolean hasNext()
- {
- return iterator.hasNext();
- }
-
- public ByteBuffer next()
- {
- return iterator.next().slice();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
-}