diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-13 17:17:50 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-13 17:17:50 +0000 |
commit | d9b4476966d0a9efe208a24dc4df2fd7957d53c3 (patch) | |
tree | f67456883f7ee792ff17ac0390093a41fdfb185b | |
parent | 82700edf3062785e05b3cb6eebe1b8137128c824 (diff) | |
download | qpid-python-d9b4476966d0a9efe208a24dc4df2fd7957d53c3.tar.gz |
QPID-5475 : [Java Broker] Add test for REST api client cert auth
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1557775 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 288 insertions, 75 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index f72c9b3020..fbc0cf39ca 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -26,6 +26,8 @@ import java.net.URL; import java.net.URLConnection; import java.net.URLDecoder; import java.net.URLStreamHandler; +import java.util.HashMap; +import java.util.Map; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; @@ -50,6 +52,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0); + private int _maxPrefetch; public ConnectionFactoryImpl(final String host, @@ -135,117 +138,200 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection connection.setTopicPrefix(_topicPrefix); connection.setUseBinaryMessageId(_useBinaryMessageId); connection.setSyncPublish(_syncPublish); + if(_maxPrefetch != 0) + { + connection.setMaxPrefetch(_maxPrefetch); + } return connection; } - public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException + public void setMaxPrefetch(final int maxPrefetch) { - URL url = new URL(null, urlString, new URLStreamHandler() - { - @Override - protected URLConnection openConnection(URL u) throws IOException - { - throw new UnsupportedOperationException(); - } - }); - String protocol = url.getProtocol(); - if(protocol == null || "".equals(protocol)) - { - protocol = "amqp"; - } -/* - else if(!protocol.equals("amqp") && !protocol.equals("amqps")) - { - throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'."); - } -*/ - String host = url.getHost(); - int port = url.getPort(); + _maxPrefetch = maxPrefetch; + } - boolean ssl = false; + private static class ConnectionOptions + { + String username; + String password; + String clientId; + String remoteHost; - if(port == -1) - { - if("amqps".equals(protocol)) - { - port = 5671; - ssl = true; - } - else - { - port = 5672; - } - } - else if("amqps".equals(protocol)) + boolean binaryMessageId = true; + boolean syncPublish; + int maxSessions; + public boolean ssl; + public int maxPrefetch; + } + + + + private static abstract class OptionSetter + { + + private static final Map<String, OptionSetter> OPTION_SETTER_MAP = new HashMap<String, OptionSetter>(); + private final String _name; + private final String _description; + + public OptionSetter(String name, String description) { - ssl = true; + OPTION_SETTER_MAP.put(name.toLowerCase(), this); + _name = name; + _description = description; } - String userInfo = url.getUserInfo(); - String username = null; - String password = null; - String clientId = null; - String remoteHost = null; - - boolean binaryMessageId = true; - boolean syncPublish = false; - int maxSessions = 0; + public abstract void setOption(ConnectionOptions options, String value) throws MalformedURLException; - if(userInfo != null) + public static void parseOptions(URL url, ConnectionOptions options) throws MalformedURLException { - String[] components = userInfo.split(":",2); - username = URLDecoder.decode(components[0]); - if(components.length == 2) + String query = url.getQuery(); + if(query != null) { - password = URLDecoder.decode(components[1]); + for(String param : query.split("&")) + { + + String[] keyValuePair = param.split("=",2); + OptionSetter setter = OPTION_SETTER_MAP.get(keyValuePair[0]); + if(setter != null) + { + setter.setOption(options, keyValuePair[1]); + } + else + { + throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL"); + } + + } } } - String query = url.getQuery(); - if(query != null) + } + + private static final OptionSetter[] _options = { - for(String param : query.split("&")) + new OptionSetter("clientid", "JMS client id / AMQP container id") { - String[] keyValuePair = param.split("=",2); - if(keyValuePair[0].equalsIgnoreCase("clientid")) + public void setOption(ConnectionOptions options, String value) { - clientId = keyValuePair[1]; + options.clientId = value; } - else if(keyValuePair[0].equalsIgnoreCase("ssl")) + }, + new OptionSetter("ssl", "Set to \"true\" to use SSL encryption") + { + public void setOption(ConnectionOptions options, String value) { - ssl = Boolean.valueOf(keyValuePair[1]); + options.ssl = Boolean.valueOf(value); } - else if(keyValuePair[0].equalsIgnoreCase("remote-host")) + }, + new OptionSetter("remote-host", "AMQP remote host") + { + public void setOption(ConnectionOptions options, String value) { - remoteHost = keyValuePair[1]; + options.remoteHost = value; } - else if (keyValuePair[0].equalsIgnoreCase("binary-messageid")) + }, + new OptionSetter("binary-messageid", "Use binary (rather than String) message ids") + { + public void setOption(ConnectionOptions options, String value) { - binaryMessageId = Boolean.parseBoolean(keyValuePair[1]); + options.binaryMessageId = Boolean.parseBoolean(value); } - else if (keyValuePair[0].equalsIgnoreCase("sync-publish")) + }, + new OptionSetter("sync-publish", "Wait for acknowledge when sending messages") + { + public void setOption(ConnectionOptions options, String value) { - syncPublish = Boolean.parseBoolean(keyValuePair[1]); + options.syncPublish = Boolean.parseBoolean(value); } - else if(keyValuePair[0].equalsIgnoreCase("max-sessions")) + }, + new OptionSetter("max-sessions", "set maximum number of sessions allowed") + { + public void setOption(ConnectionOptions options, String value) { - maxSessions = Integer.parseInt(keyValuePair[1]); + options.maxSessions = Integer.parseInt(value); } - else + }, + new OptionSetter("max-prefetch", "set maximum number of messages prefetched on a link") + { + public void setOption(ConnectionOptions options, String value) { - throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL: "+urlString); + options.maxPrefetch = Integer.parseInt(value); } } + }; + + public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException + { + URL url = new URL(null, urlString, new URLStreamHandler() + { + @Override + protected URLConnection openConnection(URL u) throws IOException + { + throw new UnsupportedOperationException(); + } + }); + String protocol = url.getProtocol(); + if (protocol == null || "".equals(protocol)) + { + protocol = "amqp"; + } + String host = url.getHost(); + int port = url.getPort(); + + final ConnectionOptions options = new ConnectionOptions(); + + if (port == -1) + { + if ("amqps".equals(protocol)) + { + port = 5671; + options.ssl = true; + } + else + { + port = 5672; + } + } + else if ("amqps".equals(protocol)) + { + options.ssl = true; + } + + + String userInfo = url.getUserInfo(); + + if (userInfo != null) + { + String[] components = userInfo.split(":", 2); + options.username = URLDecoder.decode(components[0]); + if (components.length == 2) + { + options.password = URLDecoder.decode(components[1]); + } } - if(remoteHost == null) + OptionSetter.parseOptions(url, options); + + if (options.remoteHost == null) { - remoteHost = host; + options.remoteHost = host; } ConnectionFactoryImpl connectionFactory = - new ConnectionFactoryImpl(protocol,host, port, username, password, clientId, remoteHost, ssl, maxSessions); - connectionFactory.setUseBinaryMessageId(binaryMessageId); - connectionFactory.setSyncPublish(syncPublish); + new ConnectionFactoryImpl(protocol, + host, + port, + options.username, + options.password, + options.clientId, + options.remoteHost, + options.ssl, + options.maxSessions); + connectionFactory.setUseBinaryMessageId(options.binaryMessageId); + connectionFactory.setSyncPublish(options.syncPublish); + if (options.maxPrefetch != 0) + { + connectionFactory.setMaxPrefetch(options.maxPrefetch); + } return connectionFactory; @@ -308,4 +394,6 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection { _syncPublish = syncPublish; } + + } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 55bc8e4f96..8929cfb618 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -62,6 +62,12 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); private int _maxSessions; + private int _maxPrefetch; + + public void setMaxPrefetch(final int maxPrefetch) + { + _maxPrefetch = maxPrefetch; + } private static enum State { @@ -190,6 +196,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect SessionImpl session = new SessionImpl(this, acknowledgeMode); session.setQueueSession(_isQueueConnection); session.setTopicSession(_isTopicConnection); + if(_maxPrefetch != 0) + { + session.setMaxPrefetch(_maxPrefetch); + } boolean connectionStarted = false; synchronized(_lock) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index fd6f09d162..96ee1e984d 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -76,6 +76,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi private Binary _lastTxnUpdate; private final List<Message> _recoverReplayMessages = new ArrayList<Message>(); private final List<Message> _replaymessages = new ArrayList<Message>(); + private int _maxPrefetch = 100; MessageConsumerImpl(final Destination destination, final SessionImpl session, @@ -117,6 +118,10 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName()); } _session = session; + if(session.getMaxPrefetch() != 0) + { + _maxPrefetch = session.getMaxPrefetch(); + } _receiver = createClientReceiver(); _receiver.setRemoteErrorListener(new Runnable() @@ -442,7 +447,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi public void start() { - _receiver.setCredit(UnsignedInteger.valueOf(100), true); + _receiver.setCredit(UnsignedInteger.valueOf(getMaxPrefetch()), true); } public Queue getQueue() throws JMSException @@ -487,4 +492,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi } } } + + public int getMaxPrefetch() + { + return _maxPrefetch; + } + + public void setMaxPrefetch(final int maxPrefetch) + { + _maxPrefetch = maxPrefetch; + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index e5e6ea938e..a1cf0ef4e7 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -81,6 +81,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession private boolean _isQueueSession; private boolean _isTopicSession; private Transaction _txn; + private int _maxPrefetch; protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) throws JMSException { @@ -843,6 +844,16 @@ public class SessionImpl implements Session, QueueSession, TopicSession return _txn; } + public void setMaxPrefetch(final int maxPrefetch) + { + _maxPrefetch = maxPrefetch; + } + + public int getMaxPrefetch() + { + return _maxPrefetch; + } + private class Dispatcher implements Runnable { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java new file mode 100644 index 0000000000..e92b38b4e0 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.rest; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory; +import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.qpid.test.utils.TestSSLConstants.KEYSTORE; +import static org.apache.qpid.test.utils.TestSSLConstants.KEYSTORE_PASSWORD; +import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE; +import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD; + +public class BrokerRestHttpsClientCertAuthTest extends QpidRestTestCase +{ + + @Override + public void setUp() throws Exception + { + setSystemProperty("javax.net.debug", "ssl"); + super.setUp(); + setSystemProperty("javax.net.ssl.trustStore", TRUSTSTORE); + setSystemProperty("javax.net.ssl.trustStorePassword", TRUSTSTORE_PASSWORD); + setSystemProperty("javax.net.ssl.keystore", KEYSTORE); + setSystemProperty("javax.net.ssl.keyStorePassword", KEYSTORE_PASSWORD); + + } + + @Override + protected void customizeConfiguration() throws ConfigurationException, IOException + { + super.customizeConfiguration(); + getRestTestHelper().setUseSslAuth(true); + Map<String, Object> newAttributes = new HashMap<String, Object>(); + newAttributes.put(Port.PROTOCOLS, Collections.singleton(Protocol.HTTP)); + newAttributes.put(Port.TRANSPORTS, Collections.singleton(Transport.SSL)); + newAttributes.put(Port.KEY_STORE, TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE); + newAttributes.put(Port.TRUST_STORES, Collections.singleton(TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE)); + newAttributes.put(Port.NEED_CLIENT_AUTH,"true"); + + + Map<String, Object> externalProviderAttributes = new HashMap<String, Object>(); + externalProviderAttributes.put(AuthenticationProvider.TYPE, ExternalAuthenticationManagerFactory.PROVIDER_TYPE); + externalProviderAttributes.put(AuthenticationProvider.NAME, EXTERNAL_AUTHENTICATION_PROVIDER); + getBrokerConfiguration().addAuthenticationProviderConfiguration(externalProviderAttributes); + + // set password authentication provider on http port for the tests + getBrokerConfiguration().setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER, + EXTERNAL_AUTHENTICATION_PROVIDER); + + getBrokerConfiguration().setObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, newAttributes); + } + + public void testGetWithHttps() throws Exception + { + Map<String, Object> saslData = getRestTestHelper().getJsonAsMap("/rest/sasl"); + + Asserts.assertAttributesPresent(saslData, "user"); + } +} |