summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
commit085486ebe5ff21133b9caf1c31625ac6ea356568 (patch)
tree7acbe9ca99a345dca71f9f80cd3e29ea4e3710f0 /qpid/java/common
parent60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (diff)
parente2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (diff)
downloadqpid-python-085486ebe5ff21133b9caf1c31625ac6ea356568.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java64
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java86
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java55
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java54
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java97
13 files changed, 304 insertions, 118 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
index 32a45da60c..deed32346f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
@@ -42,7 +42,6 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
throws AMQFrameDecodingException, IOException
{
ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor();
- ServerChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
final int classAndMethod = in.readInt();
int classId = classAndMethod >> 16;
int methodId = classAndMethod & 0xFFFF;
@@ -115,116 +114,117 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
ChannelOpenBody.process(channelId, in, methodProcessor);
break;
case 0x00140014:
- ChannelFlowBody.process(in, channelMethodProcessor);
+ ChannelFlowBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00140015:
- ChannelFlowOkBody.process(in, channelMethodProcessor);
+ ChannelFlowOkBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00140028:
- ChannelCloseBody.process(in, channelMethodProcessor);
+ ChannelCloseBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00140029:
- channelMethodProcessor.receiveChannelCloseOk();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveChannelCloseOk();
break;
// ACCESS_CLASS:
case 0x001e000a:
- AccessRequestBody.process(in, channelMethodProcessor);
+ AccessRequestBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// EXCHANGE_CLASS:
case 0x0028000a:
- ExchangeDeclareBody.process(in, channelMethodProcessor);
+ ExchangeDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00280014:
- ExchangeDeleteBody.process(in, channelMethodProcessor);
+ ExchangeDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00280016:
- ExchangeBoundBody.process(in, channelMethodProcessor);
+ ExchangeBoundBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// QUEUE_CLASS:
case 0x0032000a:
- QueueDeclareBody.process(in, channelMethodProcessor);
+ QueueDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00320014:
- QueueBindBody.process(in, channelMethodProcessor);
+ QueueBindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x0032001e:
- QueuePurgeBody.process(in, channelMethodProcessor);
+ QueuePurgeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00320028:
- QueueDeleteBody.process(in, channelMethodProcessor);
+ QueueDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00320032:
- QueueUnbindBody.process(in, channelMethodProcessor);
+ QueueUnbindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// BASIC_CLASS:
case 0x003c000a:
- BasicQosBody.process(in, channelMethodProcessor);
+ BasicQosBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0014:
- BasicConsumeBody.process(in, channelMethodProcessor);
+ BasicConsumeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c001e:
- BasicCancelBody.process(in, channelMethodProcessor);
+ BasicCancelBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0028:
- BasicPublishBody.process(in, channelMethodProcessor);
+ BasicPublishBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0046:
- BasicGetBody.process(in, channelMethodProcessor);
+ BasicGetBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0050:
- BasicAckBody.process(in, channelMethodProcessor);
+ BasicAckBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c005a:
- BasicRejectBody.process(in, channelMethodProcessor);
+ BasicRejectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0064:
- BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor);
+ BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(),
+ methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0066:
- BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c006e:
- BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0078:
- BasicNackBody.process(in, channelMethodProcessor);
+ BasicNackBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// CONFIRM CLASS:
case 0x0055000a:
- ConfirmSelectBody.process(in, channelMethodProcessor);
+ ConfirmSelectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// TX_CLASS:
case 0x005a000a:
- if(!channelMethodProcessor.ignoreAllButCloseOk())
+ if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
{
- channelMethodProcessor.receiveTxSelect();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveTxSelect();
}
break;
case 0x005a0014:
- if(!channelMethodProcessor.ignoreAllButCloseOk())
+ if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
{
- channelMethodProcessor.receiveTxCommit();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveTxCommit();
}
break;
case 0x005a001e:
- if(!channelMethodProcessor.ignoreAllButCloseOk())
+ if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
{
- channelMethodProcessor.receiveTxRollback();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveTxRollback();
}
break;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
index cdd44d3443..d077cc9717 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
@@ -20,12 +20,8 @@
*/
package org.apache.qpid.common;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.io.InputStream;
-import java.util.Map;
import java.util.Properties;
/**
@@ -37,18 +33,9 @@ import java.util.Properties;
*
* <p>To get the build version of any Qpid code call the {@link #main} method. This version string is usually also
* printed to the console on broker start up.
- * <p>
- * TODO Code to locate/load/log properties can be factored into a reusable properties utils class. Avoid having this
- * same snippet of loading code scattered in many places.
- * <p>
- * TODO Could also add a build number property for a sequential build number assigned by an automated build system, for
- * build reproducability purposes.
*/
public class QpidProperties
{
- /** Used for debugging purposes. */
- private static final Logger _logger = LoggerFactory.getLogger(QpidProperties.class);
-
/** The name of the version properties file to load from the class path. */
public static final String VERSION_RESOURCE = "qpidversion.properties";
@@ -68,53 +55,43 @@ public class QpidProperties
private static final String DEFAULT = "unknown";
/** Holds the product name. */
- private static String productName = DEFAULT;
+ private static final String productName;
/** Holds the product version. */
- private static String releaseVersion = DEFAULT;
+ private static final String releaseVersion;
/** Holds the source code revision. */
- private static String buildVersion = DEFAULT;
+ private static final String buildVersion;
+
+ private static final Properties properties = new Properties();
// Loads the values from the version properties file.
static
{
- Properties props = new Properties();
- try
+ try(InputStream propertyStream = QpidProperties.class.getClassLoader().getResourceAsStream(VERSION_RESOURCE))
{
- InputStream propertyStream = QpidProperties.class.getClassLoader().getResourceAsStream(VERSION_RESOURCE);
- if (propertyStream == null)
+ if (propertyStream != null)
{
- _logger.warn("Unable to find resource " + VERSION_RESOURCE + " from classloader");
- }
- else
- {
- props.load(propertyStream);
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Dumping QpidProperties");
- for (Map.Entry<Object, Object> entry : props.entrySet())
- {
- _logger.debug("Property: " + entry.getKey() + " Value: " + entry.getValue());
- }
-
- _logger.debug("End of property dump");
- }
-
- productName = readPropertyValue(props, PRODUCT_NAME_PROPERTY);
- String versionSuffix = (String) props.get(RELEASE_VERSION_SUFFIX);
- String version = readPropertyValue(props, RELEASE_VERSION_PROPERTY);
- releaseVersion = versionSuffix == null || "".equals(versionSuffix) ? version : version + ";" + versionSuffix;
- buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY);
+ properties.load(propertyStream);
}
}
catch (IOException e)
{
- // Log a warning about this and leave the values initialized to unknown.
- _logger.error("Could not load version.properties resource: " + e, e);
+ // Ignore, most likely running within an IDE, values will have the DEFAULT text
}
+
+ String versionSuffix = properties.getProperty(RELEASE_VERSION_SUFFIX);
+ String version = properties.getProperty(RELEASE_VERSION_PROPERTY, DEFAULT);
+
+ productName = properties.getProperty(PRODUCT_NAME_PROPERTY, DEFAULT);
+ releaseVersion = versionSuffix == null || "".equals(versionSuffix) ? version : version + ";" + versionSuffix;
+ buildVersion = properties.getProperty(BUILD_VERSION_PROPERTY, DEFAULT);
+ }
+
+ public static Properties asProperties()
+ {
+ return new Properties(properties);
}
/**
@@ -158,27 +135,6 @@ public class QpidProperties
}
/**
- * Helper method to extract a named property from properties.
- *
- * @param props The properties.
- * @param propertyName The named property to extract.
- *
- * @return The extracted property or a default value if the properties do not contain the named property.
- *
- * @todo A bit pointless.
- */
- private static String readPropertyValue(Properties props, String propertyName)
- {
- String retVal = (String) props.get(propertyName);
- if (retVal == null)
- {
- retVal = DEFAULT;
- }
-
- return retVal;
- }
-
- /**
* Prints the versioning information to the console. This is extremely usefull for identifying Qpid code in the
* wild, where the origination of the code has been forgotten.
*
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 86f5ddeeed..89e4c3ccdd 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -18,6 +18,17 @@
package org.apache.qpid.configuration;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This class centralized the Qpid client properties.
*
@@ -25,6 +36,8 @@ package org.apache.qpid.configuration;
*/
public class ClientProperties
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientProperties.class);
+
/**
* Currently with Qpid it is not possible to change the client ID.
* If one is not specified upon connection construction, an id is generated automatically.
@@ -292,6 +305,48 @@ public class ClientProperties
*/
public static final String QPID_USE_LEGACY_GETQUEUEDEPTH_BEHAVIOUR = "qpid.use_legacy_getqueuedepth_behavior";
+ static
+ {
+ // force load of common properties
+ Class<CommonProperties> commonPropertiesClass = CommonProperties.class;
+
+ Properties props = new Properties();
+ String initialProperties = System.getProperty("qpid.client_properties_file");
+ URL initialPropertiesLocation = null;
+ try
+ {
+ if (initialProperties == null)
+ {
+ initialPropertiesLocation = ClientProperties.class.getClassLoader().getResource("qpid-client.properties");
+ }
+ else
+ {
+ initialPropertiesLocation = (new File(initialProperties)).toURI().toURL();
+ }
+
+ if (initialPropertiesLocation != null)
+ {
+ props.load(initialPropertiesLocation.openStream());
+ }
+ }
+ catch (MalformedURLException e)
+ {
+ LOGGER.warn("Could not open client properties file '"+initialProperties+"'.", e);
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Could not open client properties file '" + initialPropertiesLocation + "'.", e);
+ }
+
+ Set<String> propertyNames = new HashSet<>(props.stringPropertyNames());
+ propertyNames.removeAll(System.getProperties().stringPropertyNames());
+ for (String propName : propertyNames)
+ {
+ System.setProperty(propName, props.getProperty(propName));
+ }
+
+ }
+
private ClientProperties()
{
//No instances
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
index 6bae93a1b8..a052a02748 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
@@ -20,6 +20,19 @@
*/
package org.apache.qpid.configuration;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.common.QpidProperties;
+
/**
* Centralised record of Qpid common properties.
*
@@ -27,6 +40,8 @@ package org.apache.qpid.configuration;
*/
public class CommonProperties
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(CommonProperties.class);
+
/**
* The timeout used by the IO layer for timeouts such as send timeout in IoSender, and the close timeout for IoSender and IoReceiver
*/
@@ -36,6 +51,45 @@ public class CommonProperties
public static final String HANDSHAKE_TIMEOUT_PROP_NAME = "qpid.handshake_timeout";
public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2;
+ static
+ {
+
+ Properties props = new Properties(QpidProperties.asProperties());
+ String initialProperties = System.getProperty("qpid.common_properties_file");
+ URL initialPropertiesLocation = null;
+ try
+ {
+ if (initialProperties == null)
+ {
+ initialPropertiesLocation = CommonProperties.class.getClassLoader().getResource("qpid-common.properties");
+ }
+ else
+ {
+ initialPropertiesLocation = (new File(initialProperties)).toURI().toURL();
+ }
+
+ if (initialPropertiesLocation != null)
+ {
+ props.load(initialPropertiesLocation.openStream());
+ }
+ }
+ catch (MalformedURLException e)
+ {
+ LOGGER.warn("Could not open common properties file '"+initialProperties+"'.", e);
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Could not open common properties file '" + initialPropertiesLocation + "'.", e);
+ }
+
+ Set<String> propertyNames = new HashSet<>(props.stringPropertyNames());
+ propertyNames.removeAll(System.getProperties().stringPropertyNames());
+ for (String propName : propertyNames)
+ {
+ System.setProperty(propName, props.getProperty(propName));
+ }
+
+ }
private CommonProperties()
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 39f27b0fe0..e7b16362e2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -535,6 +535,8 @@ public class Connection extends ConnectionInvoker
connectionLost.set(true);
synchronized (lock)
{
+ log.error(e, "exception: %s", e.getMessage());
+
switch (state)
{
case OPENING:
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
index 12f8d801dc..7af3b7af39 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
@@ -21,6 +21,7 @@
package org.apache.qpid.transport;
import java.net.InetSocketAddress;
+import java.util.Collection;
/**
* This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing
@@ -30,17 +31,21 @@ import java.net.InetSocketAddress;
public interface NetworkTransportConfiguration
{
// Taken from Socket
- Boolean getTcpNoDelay();
+ boolean getTcpNoDelay();
// The amount of memory in bytes to allocate to the incoming buffer
- Integer getReceiveBufferSize();
+ int getReceiveBufferSize();
// The amount of memory in bytes to allocate to the outgoing buffer
- Integer getSendBufferSize();
+ int getSendBufferSize();
InetSocketAddress getAddress();
boolean needClientAuth();
boolean wantClientAuth();
+
+ Collection<String> getEnabledCipherSuites();
+
+ Collection<String> getDisabledCipherSuites();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 61beae4c25..cd01cddb05 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -26,12 +26,15 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLSocket;
+
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.util.SystemUtils;
public final class IoSender implements Runnable, ByteBufferSender
@@ -58,6 +61,12 @@ public final class IoSender implements Runnable, ByteBufferSender
private final Thread senderThread;
private IoReceiver _receiver;
private final String _remoteSocketAddress;
+ private static final boolean shutdownBroken;
+
+ static
+ {
+ shutdownBroken = SystemUtils.isWindows();
+ }
private volatile Throwable exception = null;
@@ -314,6 +323,18 @@ public final class IoSender implements Runnable, ByteBufferSender
}
}
}
+
+ if (!shutdownBroken && !(socket instanceof SSLSocket))
+ {
+ try
+ {
+ socket.shutdownOutput();
+ }
+ catch (IOException e)
+ {
+ //pass
+ }
+ }
}
public void setReceiver(IoReceiver receiver)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index fe6e707f7e..2f3ba81285 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.transport.network.io;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.security.Principal;
+import java.util.Collection;
import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -61,7 +62,10 @@ public class NonBlockingConnection implements NetworkConnection
final SSLContext sslContext,
final boolean wantClientAuth,
final boolean needClientAuth,
- final Runnable onTransportEncryptionAction, final SelectorThread selectorThread)
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites,
+ final Runnable onTransportEncryptionAction,
+ final SelectorThread selectorThread)
{
_socketChannel = socketChannel;
_timeout = timeout;
@@ -69,10 +73,20 @@ public class NonBlockingConnection implements NetworkConnection
_selector = selectorThread;
_nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this,
- delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
+ delegate,
+ receiveBufferSize,
+ ticker,
+ encryptionSet,
+ sslContext,
+ wantClientAuth,
+ needClientAuth,
+ enabledCipherSuites,
+ disabledCipherSuites,
+ onTransportEncryptionAction);
}
+
public Ticker getTicker()
{
return _ticker;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index 6c96c0a18e..1c49efc294 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -152,6 +152,8 @@ public class NonBlockingNetworkTransport
_sslContext,
_config.wantClientAuth(),
_config.needClientAuth(),
+ _config.getEnabledCipherSuites(),
+ _config.getDisabledCipherSuites(),
new Runnable()
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 02099dee15..6599f4443c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
@@ -87,6 +88,8 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
final SSLContext sslContext,
final boolean wantClientAuth,
final boolean needClientAuth,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites,
final Runnable onTransportEncryptionAction)
{
_connection = connection;
@@ -112,6 +115,8 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
_sslEngine = _sslContext.createSSLEngine();
_sslEngine.setUseClientMode(false);
SSLUtil.removeSSLv3Support(_sslEngine);
+ SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
+
if(needClientAuth)
{
_sslEngine.setNeedClientAuth(true);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index ce3bace9e8..49e4ad631a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -187,7 +187,10 @@ public class SSLReceiver implements ByteBufferReceiver
}
catch(SSLException e)
{
- log.error(e, "Error caught in SSLReceiver");
+ if (log.isDebugEnabled())
+ {
+ log.debug(e, "Error caught in SSLReceiver");
+ }
_sslStatus.setSslErrorFlag();
synchronized(_sslStatus.getSslLock())
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 755f7430ba..3d133cb9b7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -142,7 +142,7 @@ public class SSLSender implements ByteBufferSender
public void send(ByteBuffer appData)
{
- if (closed.get())
+ if (closed.get() && !_sslStatus.getSslErrorFlag())
{
throw new SenderException("SSL Sender is closed");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
index b6ae2ab4a3..67dde84440 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
@@ -24,6 +24,9 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
@@ -33,7 +36,10 @@ import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -266,7 +272,35 @@ public class SSLUtil
return ks;
}
- public static void removeSSLv3Support(final SSLEngine engine)
+ private static interface SSLEntity
+ {
+ String[] getEnabledCipherSuites();
+
+ void setEnabledCipherSuites(String[] strings);
+
+ String[] getEnabledProtocols();
+
+ void setEnabledProtocols(String[] protocols);
+
+ String[] getSupportedCipherSuites();
+
+ String[] getSupportedProtocols();
+ }
+
+ private static SSLEntity asSSLEntity(final Object object, final Class<?> clazz)
+ {
+ return (SSLEntity) Proxy.newProxyInstance(SSLEntity.class.getClassLoader(), new Class[] { SSLEntity.class }, new InvocationHandler()
+ {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable
+ {
+ Method delegateMethod = clazz.getMethod(method.getName(), method.getParameterTypes());
+ return delegateMethod.invoke(object, args);
+ }
+ }) ;
+ }
+
+ private static void removeSSLv3Support(final SSLEntity engine)
{
List<String> enabledProtocols = Arrays.asList(engine.getEnabledProtocols());
if(enabledProtocols.contains(SSLV3_PROTOCOL))
@@ -277,26 +311,61 @@ public class SSLUtil
}
}
- public static void removeSSLv3Support(final SSLSocket socket)
+ public static void removeSSLv3Support(final SSLEngine engine)
{
- List<String> enabledProtocols = Arrays.asList(socket.getEnabledProtocols());
- if(enabledProtocols.contains(SSLV3_PROTOCOL))
- {
- List<String> allowedProtocols = new ArrayList<>(enabledProtocols);
- allowedProtocols.remove(SSLV3_PROTOCOL);
- socket.setEnabledProtocols(allowedProtocols.toArray(new String[allowedProtocols.size()]));
- }
+ removeSSLv3Support(asSSLEntity(engine, SSLEngine.class));
}
+ public static void removeSSLv3Support(final SSLSocket socket)
+ {
+ removeSSLv3Support(asSSLEntity(socket, SSLSocket.class));
+ }
public static void removeSSLv3Support(final SSLServerSocket socket)
{
- List<String> enabledProtocols = Arrays.asList(socket.getEnabledProtocols());
- if(enabledProtocols.contains(SSLV3_PROTOCOL))
+ removeSSLv3Support(asSSLEntity(socket, SSLServerSocket.class));
+ }
+
+ private static void updateEnabledCipherSuites(final SSLEntity entity,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites)
+ {
+ if(enabledCipherSuites != null && !enabledCipherSuites.isEmpty())
{
- List<String> allowedProtocols = new ArrayList<>(enabledProtocols);
- allowedProtocols.remove(SSLV3_PROTOCOL);
- socket.setEnabledProtocols(allowedProtocols.toArray(new String[allowedProtocols.size()]));
+ final Set<String> supportedSuites =
+ new HashSet<>(Arrays.asList(entity.getSupportedCipherSuites()));
+ supportedSuites.retainAll(enabledCipherSuites);
+ entity.setEnabledCipherSuites(supportedSuites.toArray(new String[supportedSuites.size()]));
+ }
+
+ if(disabledCipherSuites != null && !disabledCipherSuites.isEmpty())
+ {
+ final Set<String> enabledSuites = new HashSet<>(Arrays.asList(entity.getEnabledCipherSuites()));
+ enabledSuites.removeAll(disabledCipherSuites);
+ entity.setEnabledCipherSuites(enabledSuites.toArray(new String[enabledSuites.size()]));
}
+
+ }
+
+
+ public static void updateEnabledCipherSuites(final SSLEngine engine,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites)
+ {
+ updateEnabledCipherSuites(asSSLEntity(engine, SSLEngine.class), enabledCipherSuites, disabledCipherSuites);
+ }
+
+ public static void updateEnabledCipherSuites(final SSLServerSocket socket,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites)
+ {
+ updateEnabledCipherSuites(asSSLEntity(socket, SSLServerSocket.class), enabledCipherSuites, disabledCipherSuites);
+ }
+
+ public static void updateEnabledCipherSuites(final SSLSocket socket,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites)
+ {
+ updateEnabledCipherSuites(asSSLEntity(socket, SSLSocket.class), enabledCipherSuites, disabledCipherSuites);
}
}