diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/model')
38 files changed, 3495 insertions, 980 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java b/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java index 6000886956..417f6036ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java @@ -24,6 +24,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.security.group.GroupPrincipalAccessor; + public interface AuthenticationProvider extends ConfiguredObject { @@ -52,4 +55,15 @@ public interface AuthenticationProvider extends ConfiguredObject TYPE)); //children Collection<VirtualHostAlias> getVirtualHostPortBindings(); + + String getName(); + + /** + * A temporary method to create SubjectCreator. + * + * TODO: move all the functionality from SubjectCreator into AuthenticationProvider + */ + SubjectCreator getSubjectCreator(); + + void setGroupAccessor(GroupPrincipalAccessor groupPrincipalAccessor); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java index 08b01a1b65..fbecf1965b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java @@ -20,12 +20,20 @@ */ package org.apache.qpid.server.model; +import java.net.SocketAddress; import java.security.AccessControlException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Map; +import org.apache.qpid.server.logging.LogRecorder; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + public interface Broker extends ConfiguredObject { @@ -44,9 +52,49 @@ public interface Broker extends ConfiguredObject String STATE = "state"; String TIME_TO_LIVE = "timeToLive"; String UPDATED = "updated"; + String DEFAULT_AUTHENTICATION_PROVIDER = "defaultAuthenticationProvider"; + String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost"; + + String ALERT_THRESHOLD_MESSAGE_AGE = "alertThresholdMessageAge"; + String ALERT_THRESHOLD_MESSAGE_COUNT = "alertThresholdMessageCount"; + String ALERT_THRESHOLD_QUEUE_DEPTH = "alertThresholdQueueDepth"; + String ALERT_THRESHOLD_MESSAGE_SIZE = "alertThresholdMessageSize"; + String ALERT_REPEAT_GAP = "alertRepeatGap"; + String FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes"; + String FLOW_CONTROL_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; + String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; + String DEAD_LETTER_QUEUE_ENABLED = "deadLetterQueueEnabled"; + String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod"; + + String SESSION_COUNT_LIMIT = "sessionCountLimit"; + String HEART_BEAT_DELAY = "heartBeatDelay"; + String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod"; + String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled"; + + /* + * A temporary attribute to pass the path to ACL file. + * TODO: It should be a part of AuthorizationProvider. + */ + String ACL_FILE = "aclFile"; + + /* + * A temporary attributes to set the broker default key/trust stores. + * TODO: Remove them after adding a full support to configure KeyStore/TrustStore via management layers. + */ + String KEY_STORE_PATH = "keyStorePath"; + String KEY_STORE_PASSWORD = "keyStorePassword"; + String KEY_STORE_CERT_ALIAS = "keyStoreCertAlias"; + String TRUST_STORE_PATH = "trustStorePath"; + String TRUST_STORE_PASSWORD = "trustStorePassword"; + + /* + * A temporary attributes to set the broker group file. + * TODO: Remove them after adding a full support to configure authorization providers via management layers. + */ + String GROUP_FILE = "groupFile"; // Attributes - public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collection<String> AVAILABLE_ATTRIBUTES = Collections.unmodifiableList( Arrays.asList(BUILD_VERSION, BYTES_RETAINED, @@ -62,7 +110,32 @@ public interface Broker extends ConfiguredObject NAME, STATE, TIME_TO_LIVE, - UPDATED)); + UPDATED, + DEFAULT_AUTHENTICATION_PROVIDER, + DEFAULT_VIRTUAL_HOST, + ALERT_THRESHOLD_MESSAGE_AGE, + ALERT_THRESHOLD_MESSAGE_COUNT, + ALERT_THRESHOLD_QUEUE_DEPTH, + ALERT_THRESHOLD_MESSAGE_SIZE, + ALERT_REPEAT_GAP, + FLOW_CONTROL_SIZE_BYTES, + FLOW_CONTROL_RESUME_SIZE_BYTES, + MAXIMUM_DELIVERY_ATTEMPTS, + DEAD_LETTER_QUEUE_ENABLED, + HOUSEKEEPING_CHECK_PERIOD, + SESSION_COUNT_LIMIT, + HEART_BEAT_DELAY, + STATISTICS_REPORTING_PERIOD, + STATISTICS_REPORTING_RESET_ENABLED, + + ACL_FILE, + KEY_STORE_PATH, + KEY_STORE_PASSWORD, + KEY_STORE_CERT_ALIAS, + TRUST_STORE_PATH, + TRUST_STORE_PASSWORD, + GROUP_FILE + )); //children Collection < VirtualHost > getVirtualHosts(); @@ -75,6 +148,49 @@ public interface Broker extends ConfiguredObject LifetimePolicy lifetime, long ttl, Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException; - void deleteVirtualHost(VirtualHost virtualHost) - throws AccessControlException, IllegalStateException; + AuthenticationProvider getDefaultAuthenticationProvider(); + + Collection<GroupProvider> getGroupProviders(); + + /** + * A temporary hack to expose root message logger via broker instance. + * TODO We need a better way to do operational logging, for example, via logging listeners + */ + RootMessageLogger getRootMessageLogger(); + + /** + * A temporary hack to expose security manager via broker instance. + * TODO We need to add and implement an authorization provider configured object instead + */ + SecurityManager getSecurityManager(); + + /** + * TODO: A temporary hack to expose log recorder via broker instance. + */ + LogRecorder getLogRecorder(); + + VirtualHost findVirtualHostByName(String name); + + /** + * Get the SubjectCreator for the given socket address. + * TODO: move the authentication related functionality into host aliases and AuthenticationProviders + * + * @param address The (listening) socket address for which the AuthenticationManager is required + */ + SubjectCreator getSubjectCreator(SocketAddress localAddress); + + Collection<KeyStore> getKeyStores(); + + Collection<TrustStore> getTrustStores(); + + /* + * TODO: Remove this method. Eventually the broker will become a registry. + */ + VirtualHostRegistry getVirtualHostRegistry(); + + KeyStore getDefaultKeyStore(); + + TrustStore getDefaultTrustStore(); + + TaskExecutor getTaskExecutor(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java index 78b98faffe..bd7da962ba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java @@ -36,4 +36,5 @@ public interface ConfigurationChangeListener void childRemoved(ConfiguredObject object, ConfiguredObject child); + void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 414b2d083a..d567a3aa44 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -25,6 +25,9 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; +/** + * An object that can be "managed" (eg via the web interface) and usually read from configuration. + */ public interface ConfiguredObject { @@ -47,7 +50,7 @@ public interface ConfiguredObject * Attempt to change the name of the object * * Request a change to the name of the object. The caller must pass in the name it believes the object currently - * has. If the current name differes from this expected value, then no name change will occur + * has. If the current name differs from this expected value, then no name change will occur * * @param currentName the name the caller believes the object to have * @param desiredName the name the caller would like the object to have @@ -198,14 +201,25 @@ public interface ConfiguredObject /** - * Return the value for the given attribute + * Return the value for the given attribute name. The actual attribute value + * is returned if the configured object has such attribute set. If not, the + * value is looked default attributes. * - * @param name the name of the attribute - * @return the value of the attribute at the object (or null if the attribute is not set + * @param name + * the name of the attribute + * @return the value of the attribute at the object (or null if the + * attribute value is set neither on object itself no in defaults) */ Object getAttribute(String name); /** + * Return the map containing only explicitly set attributes + * + * @return the map with the attributes + */ + Map<String, Object> getActualAttributes(); + + /** * Set the value of an attribute * * @param name the name of the attribute to be set diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Group.java b/java/broker/src/main/java/org/apache/qpid/server/model/Group.java new file mode 100644 index 0000000000..aacd515107 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Group.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public interface Group extends ConfiguredObject +{ + String CREATED = "created"; + String DURABLE = "durable"; + String ID = "id"; + String LIFETIME_POLICY = "lifetimePolicy"; + String NAME = "name"; + String STATE = "state"; + String TIME_TO_LIVE = "timeToLive"; + String UPDATED = "updated"; + + // Attributes + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList( + ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED + )); + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/GroupMember.java b/java/broker/src/main/java/org/apache/qpid/server/model/GroupMember.java new file mode 100644 index 0000000000..6832cc6fa6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/GroupMember.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public interface GroupMember extends ConfiguredObject +{ + String CREATED = "created"; + String DURABLE = "durable"; + String ID = "id"; + String LIFETIME_POLICY = "lifetimePolicy"; + String NAME = "name"; + String STATE = "state"; + String TIME_TO_LIVE = "timeToLive"; + String UPDATED = "updated"; + + // Attributes + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList( + ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED + )); + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/GroupProvider.java b/java/broker/src/main/java/org/apache/qpid/server/model/GroupProvider.java new file mode 100644 index 0000000000..9016f97927 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/GroupProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.security.Principal; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +public interface GroupProvider extends ConfiguredObject +{ + public static final String ID = "id"; + public static final String DESCRIPTION = "description"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String DURABLE = "durable"; + public static final String LIFETIME_POLICY = "lifetimePolicy"; + public static final String TIME_TO_LIVE = "timeToLive"; + public static final String CREATED = "created"; + public static final String UPDATED = "updated"; + public static final String TYPE = "type"; + + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList(ID, + NAME, + DESCRIPTION, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + TYPE)); + + Set<Principal> getGroupPrincipalsForUser(String username); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/KeyStore.java b/java/broker/src/main/java/org/apache/qpid/server/model/KeyStore.java new file mode 100644 index 0000000000..959714656b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/KeyStore.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public interface KeyStore extends TrustStore +{ + + String CERTIFICATE_ALIAS = "certificateAlias"; + + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList( + ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + DESCRIPTION, + PATH, + PASSWORD, + TYPE, + KEY_MANAGER_FACTORY_ALGORITHM, + CERTIFICATE_ALIAS + )); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java index 36179fc105..2c05dce9cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java @@ -47,6 +47,10 @@ public class Model addRelationship(Broker.class, VirtualHost.class); addRelationship(Broker.class, Port.class); addRelationship(Broker.class, AuthenticationProvider.class); + addRelationship(Broker.class, GroupProvider.class); + addRelationship(Broker.class, TrustStore.class); + addRelationship(Broker.class, KeyStore.class); + addRelationship(Broker.class, Plugin.class); addRelationship(VirtualHost.class, Exchange.class); addRelationship(VirtualHost.class, Queue.class); @@ -54,6 +58,10 @@ public class Model addRelationship(VirtualHost.class, VirtualHostAlias.class); addRelationship(AuthenticationProvider.class, User.class); + addRelationship(User.class, GroupMember.class); + + addRelationship(GroupProvider.class, Group.class); + addRelationship(Group.class, GroupMember.class); addRelationship(Connection.class, Session.class); diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Plugin.java b/java/broker/src/main/java/org/apache/qpid/server/model/Plugin.java new file mode 100644 index 0000000000..b9503a5841 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Plugin.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public interface Plugin extends ConfiguredObject +{ + //Hack, using it for the class name only for consistency with the other things. + String CREATED = "created"; + String DURABLE = "durable"; + String ID = "id"; + String LIFETIME_POLICY = "lifetimePolicy"; + String NAME = "name"; + String STATE = "state"; + String TIME_TO_LIVE = "timeToLive"; + String UPDATED = "updated"; + + // Attributes + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList( + ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED + )); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Port.java b/java/broker/src/main/java/org/apache/qpid/server/model/Port.java index 50c0ebcd14..2f94c3cab7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Port.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Port.java @@ -39,6 +39,16 @@ public interface Port extends ConfiguredObject String PORT = "port"; String PROTOCOLS = "protocols"; String TRANSPORTS = "transports"; + String TCP_NO_DELAY = "tcpNoDelay"; + String SEND_BUFFER_SIZE = "sendBufferSize"; + String RECEIVE_BUFFER_SIZE = "receiveBufferSize"; + String NEED_CLIENT_AUTH = "needClientAuth"; + String WANT_CLIENT_AUTH = "wantClientAuth"; + + /** + * TODO: rename it to AUTHENTICATION_MANAGER_ID or introduce relationships + */ + String AUTHENTICATION_MANAGER = "authenticationManager"; // Attributes public static final Collection<String> AVAILABLE_ATTRIBUTES = @@ -55,7 +65,13 @@ public interface Port extends ConfiguredObject BINDING_ADDRESS, PORT, PROTOCOLS, - TRANSPORTS + TRANSPORTS, + TCP_NO_DELAY, + SEND_BUFFER_SIZE, + RECEIVE_BUFFER_SIZE, + NEED_CLIENT_AUTH, + WANT_CLIENT_AUTH, + AUTHENTICATION_MANAGER )); @@ -88,4 +104,8 @@ public interface Port extends ConfiguredObject //children Collection<VirtualHostAlias> getVirtualHostBindings(); Collection<Connection> getConnections(); + + AuthenticationProvider getAuthenticationProvider(); + + void setAuthenticationProvider(AuthenticationProvider authenticationProvider); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java b/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java index 5d9de69f9a..6cd5eb23a4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java @@ -20,14 +20,95 @@ */ package org.apache.qpid.server.model; +import java.util.Collection; +import java.util.EnumSet; + +import org.apache.qpid.server.protocol.AmqpProtocolVersion; + public enum Protocol { - AMQP_0_8, - AMQP_0_9, - AMQP_0_9_1, - AMQP_0_10, - AMQP_1_0, - JMX, - HTTP, - HTTPS + AMQP_0_8(ProtocolType.AMQP), + AMQP_0_9(ProtocolType.AMQP), + AMQP_0_9_1(ProtocolType.AMQP), + AMQP_0_10(ProtocolType.AMQP), + AMQP_1_0(ProtocolType.AMQP), + JMX_RMI(ProtocolType.JMX), + HTTP(ProtocolType.HTTP), + HTTPS(ProtocolType.HTTP), + RMI(ProtocolType.RMI); + + private final ProtocolType _protocolType; + + private Protocol(ProtocolType type) + { + _protocolType = type; + } + + public ProtocolType getProtocolType() + { + return _protocolType; + } + + public boolean isAMQP() + { + return _protocolType == ProtocolType.AMQP; + } + + public AmqpProtocolVersion toAmqpProtocolVersion() + { + switch(this) + { + case AMQP_0_8: + return AmqpProtocolVersion.v0_8; + case AMQP_0_9: + return AmqpProtocolVersion.v0_9; + case AMQP_0_9_1: + return AmqpProtocolVersion.v0_9_1; + case AMQP_0_10: + return AmqpProtocolVersion.v0_10; + case AMQP_1_0: + return AmqpProtocolVersion.v1_0_0; + default: + throw new IllegalArgumentException(this + " is not an known AMQP protocol"); + } + } + + public static Protocol valueOfObject(Object protocolObject) + { + Protocol protocol; + if (protocolObject instanceof Protocol) + { + protocol = (Protocol) protocolObject; + } + else + { + try + { + protocol = Protocol.valueOf(String.valueOf(protocolObject)); + } + catch (Exception e) + { + throw new IllegalArgumentException("Can't convert '" + protocolObject + + "' to one of the supported protocols: " + EnumSet.allOf(Protocol.class), e); + } + } + return protocol; + } + + public static boolean hasAmqpProtocol(Collection<Protocol> protocols) + { + for (Protocol protocol : protocols) + { + if (protocol.isAMQP()) + { + return true; + } + } + return false; + } + + public static enum ProtocolType + { + AMQP, HTTP, JMX, RMI; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java b/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java index 03cd46be01..ae6e5ac43a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java @@ -20,8 +20,32 @@ */ package org.apache.qpid.server.model; +import java.util.EnumSet; + public enum Transport { TCP, - SSL + SSL; + + public static Transport valueOfObject(Object transportObject) + { + Transport transport; + if (transportObject instanceof Transport) + { + transport = (Transport) transportObject; + } + else + { + try + { + transport = Transport.valueOf(String.valueOf(transportObject)); + } + catch (Exception e) + { + throw new IllegalArgumentException("Can't convert '" + transportObject + + "' to one of the supported transports: " + EnumSet.allOf(Transport.class), e); + } + } + return transport; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/TrustStore.java b/java/broker/src/main/java/org/apache/qpid/server/model/TrustStore.java new file mode 100644 index 0000000000..0c322ae02f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/TrustStore.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public interface TrustStore extends ConfiguredObject +{ + String ID = "id"; + String NAME = "name"; + String DURABLE = "durable"; + String LIFETIME_POLICY = "lifetimePolicy"; + String STATE = "state"; + String TIME_TO_LIVE = "timeToLive"; + String CREATED = "created"; + String UPDATED = "updated"; + String DESCRIPTION = "description"; + + String PATH = "path"; + String PASSWORD = "password"; + String TYPE = "type"; + String KEY_MANAGER_FACTORY_ALGORITHM = "keyManagerFactoryAlgorithm"; + + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList( + ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + DESCRIPTION, + PATH, + PASSWORD, + TYPE, + KEY_MANAGER_FACTORY_ALGORITHM + )); + + public String getPassword(); + + public void setPassword(String password); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java b/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java index 36b6a454dc..bcedd91596 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java @@ -63,6 +63,11 @@ public class UUIDGenerator return createUUID(User.class.getName(), authenticationProviderName, userName); } + public static UUID generateGroupUUID(String groupProviderName, String groupName) + { + return createUUID(Group.class.getName(), groupProviderName, groupName); + } + public static UUID generateVhostUUID(String virtualHostName) { return createUUID(VirtualHost.class.getName(), virtualHostName); @@ -77,4 +82,14 @@ public class UUIDGenerator { return createUUID(Consumer.class.getName(), virtualHostName, queueName, connectionRemoteAddress, channelNumber, consumerName); } + + public static UUID generateGroupMemberUUID(String groupProviderName, String groupName, String groupMemberName) + { + return createUUID(GroupMember.class.getName(), groupProviderName, groupName, groupMemberName); + } + + public static UUID generateBrokerChildUUID(String type, String childName) + { + return createUUID(type, childName); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/User.java b/java/broker/src/main/java/org/apache/qpid/server/model/User.java index d97bf46d31..675dc8f0d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/User.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/User.java @@ -52,8 +52,6 @@ public interface User extends ConfiguredObject PASSWORD )); - public String getPassword(); - public void setPassword(String password); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 24a3d43386..5f4ec1d3a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.model; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.MessageStore; + import java.security.AccessControlException; import java.util.Arrays; import java.util.Collection; @@ -60,17 +63,16 @@ public interface VirtualHost extends ConfiguredObject String ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "alertThresholdQueueDepthBytes"; String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages"; String DEAD_LETTER_QUEUE_ENABLED = "deadLetterQueueEnabled"; - String FEDERATION_TAG = "federationTag"; String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod"; String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes"; String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; - String STORE_CONFIGURATION = "storeConfiguration"; String STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "storeTransactionIdleTimeoutClose"; String STORE_TRANSACTION_IDLE_TIMEOUT_WARN = "storeTransactionIdleTimeoutWarn"; String STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = "storeTransactionOpenTimeoutClose"; String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn"; String STORE_TYPE = "storeType"; + String STORE_PATH = "storePath"; String SUPPORTED_EXCHANGE_TYPES = "supportedExchangeTypes"; String SUPPORTED_QUEUE_TYPES = "supportedQueueTypes"; String CREATED = "created"; @@ -81,6 +83,8 @@ public interface VirtualHost extends ConfiguredObject String STATE = "state"; String TIME_TO_LIVE = "timeToLive"; String UPDATED = "updated"; + String CONFIG_PATH = "configPath"; + // Attributes public static final Collection<String> AVAILABLE_ATTRIBUTES = Collections.unmodifiableList( @@ -96,13 +100,12 @@ public interface VirtualHost extends ConfiguredObject SUPPORTED_EXCHANGE_TYPES, SUPPORTED_QUEUE_TYPES, DEAD_LETTER_QUEUE_ENABLED, - FEDERATION_TAG, HOUSEKEEPING_CHECK_PERIOD, MAXIMUM_DELIVERY_ATTEMPTS, QUEUE_FLOW_CONTROL_SIZE_BYTES, QUEUE_FLOW_RESUME_SIZE_BYTES, STORE_TYPE, - STORE_CONFIGURATION, + STORE_PATH, STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, STORE_TRANSACTION_IDLE_TIMEOUT_WARN, STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, @@ -111,7 +114,8 @@ public interface VirtualHost extends ConfiguredObject ALERT_THRESHOLD_MESSAGE_AGE, ALERT_THRESHOLD_MESSAGE_SIZE, ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, - ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)); + ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + CONFIG_PATH)); @@ -149,4 +153,12 @@ public interface VirtualHost extends ConfiguredObject } void executeTransaction(TransactionalOperation op); + + /** + * A temporary hack to expose host security manager. + * TODO We need to add and implement an authorization provider configured object instead + */ + SecurityManager getSecurityManager(); + + MessageStore getMessageStore(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java index 7d6aa9b2cb..73e1f1e970 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java @@ -24,12 +24,18 @@ import java.security.AccessControlException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; + import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.configuration.updater.ChangeStateTask; +import org.apache.qpid.server.configuration.updater.CreateChildTask; +import org.apache.qpid.server.configuration.updater.SetAttributeTask; +import org.apache.qpid.server.configuration.updater.TaskExecutor; abstract class AbstractAdapter implements ConfiguredObject { @@ -40,134 +46,78 @@ abstract class AbstractAdapter implements ConfiguredObject new ArrayList<ConfigurationChangeListener>(); private final UUID _id; + private final Map<String, Object> _defaultAttributes = new HashMap<String, Object>(); + private final TaskExecutor _taskExecutor; - protected AbstractAdapter(UUID id) + protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor) { + _taskExecutor = taskExecutor; _id = id; - } - - static String getStringAttribute(String name, Map<String,Object> attributes, String defaultVal) - { - final Object value = attributes.get(name); - return value == null ? defaultVal : String.valueOf(value); - } - - static Map getMapAttribute(String name, Map<String,Object> attributes, Map defaultVal) - { - final Object value = attributes.get(name); - if(value == null) - { - return defaultVal; - } - else if(value instanceof Map) + if (attributes != null) { - return (Map) value; + Collection<String> names = getAttributeNames(); + for (String name : names) + { + if (attributes.containsKey(name)) + { + _attributes.put(name, attributes.get(name)); + } + } } - else + if (defaults != null) { - throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Map"); + _defaultAttributes.putAll(defaults); } } - - static <E extends Enum> E getEnumAttribute(Class<E> clazz, String name, Map<String,Object> attributes, E defaultVal) + protected AbstractAdapter(UUID id, TaskExecutor taskExecutor) { - Object obj = attributes.get(name); - if(obj == null) - { - return defaultVal; - } - else if(clazz.isInstance(obj)) - { - return (E) obj; - } - else if(obj instanceof String) - { - return (E) Enum.valueOf(clazz, (String)obj); - } - else - { - throw new IllegalArgumentException("Value for attribute " + name + " is not of required type " + clazz.getSimpleName()); - } + this(id, null, null, taskExecutor); } - static Boolean getBooleanAttribute(String name, Map<String,Object> attributes, Boolean defaultValue) + public final UUID getId() { - Object obj = attributes.get(name); - if(obj == null) - { - return defaultValue; - } - else if(obj instanceof Boolean) - { - return (Boolean) obj; - } - else if(obj instanceof String) - { - return Boolean.parseBoolean((String) obj); - } - else - { - throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Boolean"); - } + return _id; } - static Integer getIntegerAttribute(String name, Map<String,Object> attributes, Integer defaultValue) + public State getDesiredState() { - Object obj = attributes.get(name); - if(obj == null) - { - return defaultValue; - } - else if(obj instanceof Number) - { - return ((Number) obj).intValue(); - } - else if(obj instanceof String) - { - return Integer.valueOf((String) obj); - } - else - { - throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Integer"); - } + return null; //TODO } - static Long getLongAttribute(String name, Map<String,Object> attributes, Long defaultValue) + @Override + public final State setDesiredState(final State currentState, final State desiredState) + throws IllegalStateTransitionException, AccessControlException { - Object obj = attributes.get(name); - if(obj == null) - { - return defaultValue; - } - else if(obj instanceof Number) + if (_taskExecutor.isTaskExecutorThread()) { - return ((Number) obj).longValue(); - } - else if(obj instanceof String) - { - return Long.valueOf((String) obj); + if (setState(currentState, desiredState)) + { + notifyStateChanged(currentState, desiredState); + } } else { - throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Long"); + _taskExecutor.submitAndWait(new ChangeStateTask(this, currentState, desiredState)); } + return getActualState(); } - public final UUID getId() - { - return _id; - } + /** + * @return true when the state has been successfully updated to desiredState or false otherwise + */ + protected abstract boolean setState(State currentState, State desiredState); - public State getDesiredState() + protected void notifyStateChanged(final State currentState, final State desiredState) { - return null; //TODO - } - - public State setDesiredState(final State currentState, final State desiredState) - throws IllegalStateTransitionException, AccessControlException - { - return null; //TODO + synchronized (_changeListeners) + { + List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners); + for(ConfigurationChangeListener listener : copy) + { + listener.stateChanged(this, currentState, desiredState); + } + } } public void addChangeListener(final ConfigurationChangeListener listener) @@ -176,7 +126,7 @@ abstract class AbstractAdapter implements ConfiguredObject { throw new NullPointerException("Cannot add a null listener"); } - synchronized (this) + synchronized (_changeListeners) { if(!_changeListeners.contains(listener)) { @@ -191,39 +141,76 @@ abstract class AbstractAdapter implements ConfiguredObject { throw new NullPointerException("Cannot remove a null listener"); } - synchronized (this) + synchronized (_changeListeners) { return _changeListeners.remove(listener); } } - protected void childAdded(ConfiguredObject child) { - synchronized (this) + synchronized (_changeListeners) { - for(ConfigurationChangeListener listener : _changeListeners) + List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners); + for(ConfigurationChangeListener listener : copy) { listener.childAdded(this, child); } } } - protected void childRemoved(ConfiguredObject child) { - synchronized (this) + synchronized (_changeListeners) { - for(ConfigurationChangeListener listener : _changeListeners) + List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners); + for(ConfigurationChangeListener listener : copy) { listener.childRemoved(this, child); } } } - public Object getAttribute(final String name) + protected void attributeSet(String attrinuteName, Object oldAttributeValue, Object newAttributeValue) { - synchronized (this) + synchronized (_changeListeners) + { + List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners); + for(ConfigurationChangeListener listener : copy) + { + listener.attributeSet(this, attrinuteName, oldAttributeValue, newAttributeValue); + } + } + } + + private final Object getDefaultAttribute(String name) + { + return _defaultAttributes.get(name); + } + + @Override + public Object getAttribute(String name) + { + Object value = getActualAttribute(name); + if (value == null) + { + value = getDefaultAttribute(name); + } + return value; + } + + @Override + public final Map<String, Object> getActualAttributes() + { + synchronized (_attributes) + { + return new HashMap<String, Object>(_attributes); + } + } + + private Object getActualAttribute(final String name) + { + synchronized (_attributes) { return _attributes.get(name); } @@ -232,25 +219,41 @@ abstract class AbstractAdapter implements ConfiguredObject public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { - synchronized (this) + if (_taskExecutor.isTaskExecutorThread()) { - Object currentValue = _attributes.get(name); + if (changeAttribute(name, expected, desired)) + { + attributeSet(name, expected, desired); + } + } + else + { + _taskExecutor.submitAndWait(new SetAttributeTask(this, name, expected, desired)); + } + return getAttribute(name); + } + + protected boolean changeAttribute(final String name, final Object expected, final Object desired) + { + synchronized (_attributes) + { + Object currentValue = getAttribute(name); if((currentValue == null && expected == null) || (currentValue != null && currentValue.equals(expected))) { _attributes.put(name, desired); - return desired; + return true; } else { - return currentValue; + return false; } } } public <T extends ConfiguredObject> T getParent(final Class<T> clazz) { - synchronized (this) + synchronized (_parents) { return (T) _parents.get(clazz); } @@ -258,7 +261,7 @@ abstract class AbstractAdapter implements ConfiguredObject protected <T extends ConfiguredObject> void addParent(Class<T> clazz, T parent) { - synchronized (this) + synchronized (_parents) { _parents.put(clazz, parent); } @@ -280,4 +283,40 @@ abstract class AbstractAdapter implements ConfiguredObject } } + @Override + public String toString() + { + return getClass().getSimpleName() + " [id=" + _id + ", name=" + getName() + "]"; + } + + @SuppressWarnings("unchecked") + @Override + public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + { + if (_taskExecutor.isTaskExecutorThread()) + { + C child = addChild(childClass, attributes, otherParents); + if (child != null) + { + childAdded(child); + } + return child; + } + else + { + return (C)_taskExecutor.submitAndWait(new CreateChildTask(this, childClass, attributes, otherParents)); + } + } + + protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + { + throw new UnsupportedOperationException(); + } + + + protected TaskExecutor getTaskExecutor() + { + return _taskExecutor; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java new file mode 100644 index 0000000000..ebd98f915d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java @@ -0,0 +1,198 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import java.security.AccessControlException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.KeyStore; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.model.TrustStore; +import org.apache.qpid.server.util.MapValueConverter; + +public abstract class AbstractKeyStoreAdapter extends AbstractAdapter +{ + private String _name; + private String _password; + + protected AbstractKeyStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes) + { + super(id, broker.getTaskExecutor()); + addParent(Broker.class, broker); + _name = MapValueConverter.getStringAttribute(TrustStore.NAME, attributes); + _password = MapValueConverter.getStringAttribute(TrustStore.PASSWORD, attributes); + setMandatoryAttribute(TrustStore.PATH, attributes); + setOptionalAttribute(TrustStore.TYPE, attributes); + setOptionalAttribute(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM, attributes); + setOptionalAttribute(TrustStore.DESCRIPTION, attributes); + } + + @Override + public String getName() + { + return _name; + } + + @Override + public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException + { + throw new IllegalStateException(); + } + + @Override + public State getActualState() + { + return State.ACTIVE; + } + + @Override + public boolean isDurable() + { + return true; + } + + @Override + public void setDurable(boolean durable) throws IllegalStateException, AccessControlException, IllegalArgumentException + { + throw new IllegalStateException(); + } + + @Override + public LifetimePolicy getLifetimePolicy() + { + return LifetimePolicy.PERMANENT; + } + + @Override + public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException, AccessControlException, + IllegalArgumentException + { + throw new IllegalStateException(); + } + + @Override + public long getTimeToLive() + { + return 0; + } + + @Override + public long setTimeToLive(long expected, long desired) throws IllegalStateException, AccessControlException, IllegalArgumentException + { + throw new IllegalStateException(); + } + + @Override + public Statistics getStatistics() + { + return NoStatistics.getInstance(); + } + + @Override + public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) + { + return Collections.emptySet(); + } + + @Override + public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + { + throw new UnsupportedOperationException(); + } + + @Override + public Object getAttribute(String name) + { + if(KeyStore.ID.equals(name)) + { + return getId(); + } + else if(KeyStore.NAME.equals(name)) + { + return getName(); + } + else if(KeyStore.STATE.equals(name)) + { + return getActualState(); + } + else if(KeyStore.DURABLE.equals(name)) + { + return isDurable(); + } + else if(KeyStore.LIFETIME_POLICY.equals(name)) + { + return getLifetimePolicy(); + } + else if(KeyStore.TIME_TO_LIVE.equals(name)) + { + return getTimeToLive(); + } + else if(KeyStore.CREATED.equals(name)) + { + + } + else if(KeyStore.UPDATED.equals(name)) + { + + } + else if(KeyStore.PASSWORD.equals(name)) + { + return null; // for security reasons we don't expose the password + } + return super.getAttribute(name); + } + + @Override + protected boolean setState(State currentState, State desiredState) + { + return false; + } + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + private void setMandatoryAttribute(String name, Map<String, Object> attributeValues) + { + changeAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues)); + } + + private void setOptionalAttribute(String name, Map<String, Object> attributeValues) + { + if (attributeValues.get(name) != null) + { + changeAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues)); + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java new file mode 100644 index 0000000000..ed4af9881f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import java.security.AccessControlException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.Statistics; + +public abstract class AbstractPluginAdapter extends AbstractAdapter implements Plugin +{ + + protected AbstractPluginAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor) + { + super(id, defaults, attributes, taskExecutor); + } + + @Override + public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException + { + throw new UnsupportedOperationException(); + } + + @Override + public State getActualState() + { + return null; + } + + @Override + public boolean isDurable() + { + return true; + } + + @Override + public void setDurable(boolean durable) throws IllegalStateException, AccessControlException, IllegalArgumentException + { + throw new UnsupportedOperationException(); + } + + @Override + public LifetimePolicy getLifetimePolicy() + { + return LifetimePolicy.PERMANENT; + } + + @Override + public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException, + AccessControlException, IllegalArgumentException + { + throw new UnsupportedOperationException(); + } + + @Override + public long getTimeToLive() + { + return 0; + } + + @Override + public long setTimeToLive(long expected, long desired) throws IllegalStateException, AccessControlException, + IllegalArgumentException + { + throw new UnsupportedOperationException(); + } + + @Override + public Statistics getStatistics() + { + return null; + } + + @Override + public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) + { + return Collections.emptyList(); + } + + @Override + public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, + ConfiguredObject... otherParents) + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<String> getAttributeNames() + { + return AVAILABLE_ATTRIBUTES; + } + + @Override + public Object getAttribute(String name) + { + if (ID.equals(name)) + { + return getId(); + } + else if (STATE.equals(name)) + { + return getActualState(); + } + else if (DURABLE.equals(name)) + { + return isDurable(); + } + else if (LIFETIME_POLICY.equals(name)) + { + return getLifetimePolicy(); + } + else if (TIME_TO_LIVE.equals(name)) + { + return getTimeToLive(); + } + else if (CREATED.equals(name)) + { + + } + else if (UPDATED.equals(name)) + { + + } + return super.getAttribute(name); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java new file mode 100644 index 0000000000..2f7e89bb2b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.net.ssl.SSLContext; + +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.BrokerMessages; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.KeyStore; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.TrustStore; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.network.IncomingNetworkTransport; + +public class AmqpPortAdapter extends PortAdapter +{ + private final Broker _broker; + private IncomingNetworkTransport _transport; + + public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes, TaskExecutor taskExecutor) + { + super(id, broker, attributes, defaultAttributes, taskExecutor); + _broker = broker; + } + + @Override + protected void onActivate() + { + Collection<Transport> transports = getTransports(); + Set<AmqpProtocolVersion> supported = convertFromModelProtocolsToAmqp(getProtocols()); + + SSLContext sslContext = null; + if (transports.contains(Transport.SSL)) + { + sslContext = createSslContext(); + } + + AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply(); + + String bindingAddress = (String) getAttribute(Port.BINDING_ADDRESS); + if (WILDCARD_ADDRESS.equals(bindingAddress)) + { + bindingAddress = null; + } + Integer port = (Integer) getAttribute(Port.PORT); + InetSocketAddress bindingSocketAddress = null; + if ( bindingAddress == null ) + { + bindingSocketAddress = new InetSocketAddress(port); + } + else + { + bindingSocketAddress = new InetSocketAddress(bindingAddress, port); + } + + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration( + bindingSocketAddress, (Boolean)getAttribute(TCP_NO_DELAY), + (Integer)getAttribute(SEND_BUFFER_SIZE), (Integer)getAttribute(RECEIVE_BUFFER_SIZE), + (Boolean)getAttribute(NEED_CLIENT_AUTH), (Boolean)getAttribute(WANT_CLIENT_AUTH)); + + _transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance(); + final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( + _broker, supported, defaultSupportedProtocolReply); + + _transport.accept(settings, protocolEngineFactory, sslContext); + CurrentActor.get().message(BrokerMessages.LISTENING(getTransports().toString(), getPort())); + } + + @Override + protected void onStop() + { + if (_transport != null) + { + CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(getTransports().toString(), getPort())); + _transport.close(); + } + } + + private Set<AmqpProtocolVersion> convertFromModelProtocolsToAmqp(Collection<Protocol> modelProtocols) + { + Set<AmqpProtocolVersion> amqpProtocols = new HashSet<AmqpProtocolVersion>(); + for (Protocol protocol : modelProtocols) + { + amqpProtocols.add(protocol.toAmqpProtocolVersion()); + } + return amqpProtocols; + } + + private SSLContext createSslContext() + { + KeyStore keyStore = _broker.getDefaultKeyStore(); + if (keyStore == null) + { + throw new IllegalConfigurationException("SSL was requested on AMQP port '" + + this.getName() + "' but no key store defined"); + } + + TrustStore trustStore = _broker.getDefaultTrustStore(); + if (((Boolean)getAttribute(NEED_CLIENT_AUTH) || (Boolean)getAttribute(WANT_CLIENT_AUTH)) && trustStore == null) + { + throw new IllegalConfigurationException("Client certificate authentication is enabled on AMQP port '" + + this.getName() + "' but no trust store defined"); + } + + String keystorePath = (String)keyStore.getAttribute(KeyStore.PATH); + String keystorePassword = keyStore.getPassword(); + String keystoreType = (String)keyStore.getAttribute(KeyStore.TYPE); + String keyManagerFactoryAlgorithm = (String)keyStore.getAttribute(KeyStore.KEY_MANAGER_FACTORY_ALGORITHM); + String certAlias = (String)keyStore.getAttribute(KeyStore.CERTIFICATE_ALIAS); + + final SSLContext sslContext; + try + { + if(trustStore != null) + { + String trustStorePassword = trustStore.getPassword(); + String trustStoreType = (String)trustStore.getAttribute(TrustStore.TYPE); + String trustManagerFactoryAlgorithm = (String)trustStore.getAttribute(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM); + String trustStorePath = (String)trustStore.getAttribute(TrustStore.PATH); + + sslContext = SSLContextFactory.buildClientContext(trustStorePath, + trustStorePassword, + trustStoreType, + trustManagerFactoryAlgorithm, + keystorePath, + keystorePassword, keystoreType, keyManagerFactoryAlgorithm, + certAlias); + } + else + { + sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm); + } + } + catch (GeneralSecurityException e) + { + throw new RuntimeException("Unable to create SSLContext for key or trust store", e); + } + catch (IOException e) + { + throw new RuntimeException("Unable to create SSLContext - unable to load key/trust store", e); + } + return sslContext; + } + + private AmqpProtocolVersion getDefaultAmqpSupportedReply() + { + String defaultAmqpSupportedReply = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY); + if (defaultAmqpSupportedReply != null) + { + return AmqpProtocolVersion.valueOf(defaultAmqpSupportedReply); + } + return null; + } + + + class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration + { + private final InetSocketAddress _bindingSocketAddress; + private final Boolean _tcpNoDelay; + private final Integer _sendBufferSize; + private final Integer _receiveBufferSize; + private final boolean _needClientAuth; + private final boolean _wantClientAuth; + + public ServerNetworkTransportConfiguration( + InetSocketAddress bindingSocketAddress, boolean tcpNoDelay, + int sendBufferSize, int receiveBufferSize, + boolean needClientAuth, boolean wantClientAuth) + { + _bindingSocketAddress = bindingSocketAddress; + _tcpNoDelay = tcpNoDelay; + _sendBufferSize = sendBufferSize; + _receiveBufferSize = receiveBufferSize; + _needClientAuth = needClientAuth; + _wantClientAuth = wantClientAuth; + } + + @Override + public boolean wantClientAuth() + { + return _wantClientAuth; + } + + @Override + public boolean needClientAuth() + { + return _needClientAuth; + } + + @Override + public Boolean getTcpNoDelay() + { + return _tcpNoDelay; + } + + @Override + public Integer getSendBufferSize() + { + return _sendBufferSize; + } + + @Override + public Integer getReceiveBufferSize() + { + return _receiveBufferSize; + } + + @Override + public InetSocketAddress getAddress() + { + return _bindingSocketAddress; + } + }; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java index 8c2bc98ba7..ac4b0255d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java @@ -29,38 +29,50 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; + import javax.security.auth.login.AccountNotFoundException; import org.apache.log4j.Logger; -import org.apache.qpid.server.model.*; -import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.IllegalStateTransitionException; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.PasswordCredentialManagingAuthenticationProvider; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.User; +import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.group.GroupPrincipalAccessor; +import org.apache.qpid.server.security.SecurityManager; public abstract class AuthenticationProviderAdapter<T extends AuthenticationManager> extends AbstractAdapter implements AuthenticationProvider { private static final Logger LOGGER = Logger.getLogger(AuthenticationProviderAdapter.class); - private final BrokerAdapter _broker; private final T _authManager; + protected final Broker _broker; - private AuthenticationProviderAdapter(BrokerAdapter brokerAdapter, - final T authManager) - { - super(UUIDGenerator.generateRandomUUID()); - _broker = brokerAdapter; - _authManager = authManager; - } + private GroupPrincipalAccessor _groupAccessor; - public static AuthenticationProviderAdapter createAuthenticationProviderAdapter(BrokerAdapter brokerAdapter, - final AuthenticationManager authManager) + private Object _type; + + private AuthenticationProviderAdapter(UUID id, Broker broker, final T authManager, Map<String, Object> attributes) { - return authManager instanceof PrincipalDatabaseAuthenticationManager - ? new PrincipalDatabaseAuthenticationManagerAdapter(brokerAdapter, (PrincipalDatabaseAuthenticationManager) authManager) - : new SimpleAuthenticationProviderAdapter(brokerAdapter, authManager); + super(id, null, attributes, broker.getTaskExecutor()); + _authManager = authManager; + _broker = broker; + _type = authManager instanceof PrincipalDatabaseAuthenticationManager? PrincipalDatabaseAuthenticationManager.class.getSimpleName() : AuthenticationManager.class.getSimpleName() ; + addParent(Broker.class, broker); } T getAuthManager() @@ -77,7 +89,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana @Override public String getName() { - return _authManager.getClass().getSimpleName(); + return (String)getAttribute(AuthenticationProvider.NAME); } @Override @@ -147,7 +159,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana { if(TYPE.equals(name)) { - return _authManager.getClass().getSimpleName(); + return _type; } else if(CREATED.equals(name)) { @@ -165,10 +177,6 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana { return LifetimePolicy.PERMANENT; } - else if(NAME.equals(name)) - { - return getName(); - } else if(STATE.equals(name)) { return State.ACTIVE; // TODO @@ -191,44 +199,86 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, - Map<String, Object> attributes, - ConfiguredObject... otherParents) + public boolean setState(State currentState, State desiredState) + throws IllegalStateTransitionException, AccessControlException { - return null; + if(desiredState == State.DELETED) + { + return true; + } + else if(desiredState == State.ACTIVE) + { + if (_groupAccessor == null) + { + throw new IllegalStateTransitionException("Cannot transit into ACTIVE state with null group accessor!"); + } + _authManager.initialise(); + return true; + } + else if(desiredState == State.STOPPED) + { + _authManager.close(); + return true; + } + return false; } - private static class SimpleAuthenticationProviderAdapter extends AuthenticationProviderAdapter<AuthenticationManager> + @Override + public SubjectCreator getSubjectCreator() { + return new SubjectCreator(_authManager, _groupAccessor); + } + + public void setGroupAccessor(GroupPrincipalAccessor groupAccessor) + { + _groupAccessor = groupAccessor; + } + + public static class SimpleAuthenticationProviderAdapter extends AuthenticationProviderAdapter<AuthenticationManager> + { + public SimpleAuthenticationProviderAdapter( - BrokerAdapter brokerAdapter, AuthenticationManager authManager) + UUID id, Broker broker, AuthenticationManager authManager, Map<String, Object> attributes) + { + super(id, broker,authManager, attributes); + } + + @Override + public <C extends ConfiguredObject> C createChild(Class<C> childClass, + Map<String, Object> attributes, + ConfiguredObject... otherParents) { - super(brokerAdapter,authManager); + throw new UnsupportedOperationException(); } } - private static class PrincipalDatabaseAuthenticationManagerAdapter + public static class PrincipalDatabaseAuthenticationManagerAdapter extends AuthenticationProviderAdapter<PrincipalDatabaseAuthenticationManager> implements PasswordCredentialManagingAuthenticationProvider { public PrincipalDatabaseAuthenticationManagerAdapter( - BrokerAdapter brokerAdapter, PrincipalDatabaseAuthenticationManager authManager) + UUID id, Broker broker, PrincipalDatabaseAuthenticationManager authManager, Map<String, Object> attributes) { - super(brokerAdapter, authManager); + super(id, broker, authManager, attributes); } @Override public boolean createUser(String username, String password, Map<String, String> attributes) { - return getPrincipalDatabase().createPrincipal(new UsernamePrincipal(username), password.toCharArray()); + if(getSecurityManager().authoriseUserOperation(Operation.CREATE, username)) + { + return getPrincipalDatabase().createPrincipal(new UsernamePrincipal(username), password.toCharArray()); + } + else + { + throw new AccessControlException("Do not have permission to create new user"); + } } @Override public void deleteUser(String username) throws AccountNotFoundException { - if(getSecurityManager().authoriseMethod(Operation.DELETE, - "UserManagement", - "deleteUser")) + if(getSecurityManager().authoriseUserOperation(Operation.DELETE, username)) { getPrincipalDatabase().deletePrincipal(new UsernamePrincipal(username)); @@ -239,9 +289,9 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana } } - private org.apache.qpid.server.security.SecurityManager getSecurityManager() + private SecurityManager getSecurityManager() { - return ApplicationRegistry.getInstance().getSecurityManager(); + return _broker.getSecurityManager(); } private PrincipalDatabase getPrincipalDatabase() @@ -252,18 +302,13 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana @Override public void setPassword(String username, String password) throws AccountNotFoundException { - getPrincipalDatabase().updatePassword(new UsernamePrincipal(username), password.toCharArray()); - } - - public void reload() throws IOException - { - if(getSecurityManager().authoriseMethod(Operation.UPDATE, "UserManagement", "reload")) + if(getSecurityManager().authoriseUserOperation(Operation.UPDATE, username)) { - getPrincipalDatabase().reload(); + getPrincipalDatabase().updatePassword(new UsernamePrincipal(username), password.toCharArray()); } else { - throw new AccessControlException("Do not have permission to reload principal database"); + throw new AccessControlException("Do not have permission to set password"); } } @@ -274,34 +319,41 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana Map<String, Map<String,String>> users = new HashMap<String, Map<String, String>>(); for(Principal principal : getPrincipalDatabase().getUsers()) { - users.put(principal.getName(), Collections.EMPTY_MAP); + users.put(principal.getName(), Collections.<String, String>emptyMap()); } return users; } + public void reload() throws IOException + { + getPrincipalDatabase().reload(); + } + @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == User.class) { - Principal p = new UsernamePrincipal((String) attributes.get("name")); - if(getSecurityManager().authoriseMethod(Operation.UPDATE, "UserManagement", "createUser")) + String username = (String) attributes.get("name"); + String password = (String) attributes.get("password"); + Principal p = new UsernamePrincipal(username); + + if(createUser(username, password,null)) { - if(getPrincipalDatabase().createPrincipal(p, ((String)attributes.get("password")).toCharArray())) - { - return (C) new PrincipalAdapter(p); - } + @SuppressWarnings("unchecked") + C pricipalAdapter = (C) new PrincipalAdapter(p, getTaskExecutor()); + return pricipalAdapter; } else { - throw new AccessControlException("Do not have permission to create a new user"); + //TODO? Silly interface on the PrincipalDatabase at fault + throw new RuntimeException("Failed to create user"); } - } - return super.createChild(childClass, attributes, otherParents); + return super.addChild(childClass, attributes, otherParents); } @Override @@ -313,9 +365,11 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana Collection<User> principals = new ArrayList<User>(users.size()); for(Principal user : users) { - principals.add(new PrincipalAdapter(user)); + principals.add(new PrincipalAdapter(user, getTaskExecutor())); } - return (Collection<C>) Collections.unmodifiableCollection(principals); + @SuppressWarnings("unchecked") + Collection<C> unmodifiablePrincipals = (Collection<C>) Collections.unmodifiableCollection(principals); + return unmodifiablePrincipals; } else { @@ -328,20 +382,14 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana private final Principal _user; - public PrincipalAdapter(Principal user) + public PrincipalAdapter(Principal user, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateUserUUID(PrincipalDatabaseAuthenticationManagerAdapter.this.getName(), user.getName())); + super(UUIDGenerator.generateUserUUID(PrincipalDatabaseAuthenticationManagerAdapter.this.getName(), user.getName()), taskExecutor); _user = user; } @Override - public String getPassword() - { - return null; - } - - @Override public void setPassword(String password) { try @@ -445,6 +493,10 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana { return getId(); } + else if(PASSWORD.equals(name)) + { + return null; // for security reasons we don't expose the password + } else if(NAME.equals(name)) { return getName(); @@ -453,20 +505,19 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana } @Override - public Object setAttribute(String name, Object expected, Object desired) + public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { if(name.equals(PASSWORD)) { setPassword((String)desired); + return true; } - return super.setAttribute(name, - expected, - desired); + return super.changeAttribute(name, expected, desired); } @Override - public State setDesiredState(State currentState, State desiredState) + protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException, AccessControlException { if(desiredState == State.DELETED) @@ -479,9 +530,9 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana { LOGGER.warn("Failed to delete user " + _user, e); } - return State.DELETED; + return true; } - return super.setDesiredState(currentState, desiredState); + return false; } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderFactory.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderFactory.java new file mode 100644 index 0000000000..e5108ebbcf --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderFactory.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.plugin.AuthenticationManagerFactory; +import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.security.group.GroupPrincipalAccessor; +import org.apache.qpid.server.model.adapter.AuthenticationProviderAdapter.PrincipalDatabaseAuthenticationManagerAdapter; +import org.apache.qpid.server.model.adapter.AuthenticationProviderAdapter.SimpleAuthenticationProviderAdapter; + +public class AuthenticationProviderFactory +{ + private final Iterable<AuthenticationManagerFactory> _factories; + + public AuthenticationProviderFactory(QpidServiceLoader<AuthenticationManagerFactory> authManagerFactoryServiceLoader) + { + _factories = authManagerFactoryServiceLoader.atLeastOneInstanceOf(AuthenticationManagerFactory.class); + } + + /** + * Creates {@link AuthenticationProvider} for given ID, {@link Broker} and attributes. + * <p> + * The configured {@link AuthenticationManagerFactory}'s are used to try to create the {@link AuthenticationProvider}. + * The first non-null instance is returned. The factories are used in non-deterministic order. + * @param groupPrincipalAccessor TODO + */ + public AuthenticationProvider create(UUID id, Broker broker, Map<String, Object> attributes, GroupPrincipalAccessor groupPrincipalAccessor) + { + for (AuthenticationManagerFactory factory : _factories) + { + AuthenticationManager manager = factory.createInstance(attributes); + if (manager != null) + { + AuthenticationProviderAdapter<?> authenticationProvider; + if (manager instanceof PrincipalDatabaseAuthenticationManager) + { + authenticationProvider = new PrincipalDatabaseAuthenticationManagerAdapter(id, broker, + (PrincipalDatabaseAuthenticationManager) manager, attributes); + } + else + { + authenticationProvider = new SimpleAuthenticationProviderAdapter(id, broker, manager, attributes); + } + authenticationProvider.setGroupAccessor(groupPrincipalAccessor); + return authenticationProvider; + } + } + + throw new IllegalArgumentException("No authentication provider factory found for configuration attributes " + attributes); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java index abd3160686..eb2d0dd7e2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java @@ -48,7 +48,7 @@ final class BindingAdapter extends AbstractAdapter implements Binding ExchangeAdapter exchangeAdapter, QueueAdapter queueAdapter) { - super(binding.getId()); + super(binding.getId(), queueAdapter.getTaskExecutor()); _binding = binding; _exchange = exchangeAdapter; _queue = queueAdapter; @@ -206,27 +206,20 @@ final class BindingAdapter extends AbstractAdapter implements Binding } @Override - public Object setAttribute(final String name, final Object expected, final Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - } - - @Override public Collection<String> getAttributeNames() { return Binding.AVAILABLE_ATTRIBUTES; } @Override - public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException, AccessControlException { if (desiredState == State.DELETED) { delete(); - return State.DELETED; + return true; } - return super.setDesiredState(currentState, desiredState); + return false; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index f1cce2d45c..533ecfe937 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -21,82 +21,211 @@ package org.apache.qpid.server.model.adapter; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.security.AccessControlException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.UUID; + +import javax.net.ssl.KeyManagerFactory; + +import org.apache.log4j.Logger; import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.logging.LogRecorder; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.GroupProvider; +import org.apache.qpid.server.model.KeyStore; import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; -import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry; -import org.apache.qpid.server.transport.QpidAcceptor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.security.group.FileGroupManager; +import org.apache.qpid.server.security.group.GroupManager; +import org.apache.qpid.server.security.group.GroupPrincipalAccessor; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHostRegistry.RegistryChangeListener, - IApplicationRegistry.PortBindingListener, - IAuthenticationManagerRegistry.RegistryChangeListener +public class BrokerAdapter extends AbstractAdapter implements Broker, ConfigurationChangeListener { - - private final IApplicationRegistry _applicationRegistry; - private String _name; - private final Map<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter> _vhostAdapters = - new HashMap<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter>(); - private final StatisticsAdapter _statistics; - private final Map<QpidAcceptor, PortAdapter> _portAdapters = new HashMap<QpidAcceptor, PortAdapter>(); - private Collection<HTTPPortAdapter> _httpManagementPorts; - - private final Map<AuthenticationManager, AuthenticationProviderAdapter> _authManagerAdapters = - new HashMap<AuthenticationManager, AuthenticationProviderAdapter>(); - - - public BrokerAdapter(final IApplicationRegistry instance) - { - super(UUIDGenerator.generateRandomUUID()); - _applicationRegistry = instance; - _name = "Broker"; - _statistics = new StatisticsAdapter(instance); - - instance.getVirtualHostRegistry().addRegistryChangeListener(this); - populateVhosts(); - instance.addPortBindingListener(this); - populatePorts(); - instance.addRegistryChangeListener(this); - populateAuthenticationManagers(); - } - - private void populateVhosts() - { - synchronized(_vhostAdapters) - { - Collection<org.apache.qpid.server.virtualhost.VirtualHost> actualVhosts = - _applicationRegistry.getVirtualHostRegistry().getVirtualHosts(); - for(org.apache.qpid.server.virtualhost.VirtualHost vh : actualVhosts) - { - if(!_vhostAdapters.containsKey(vh)) - { - _vhostAdapters.put(vh, new VirtualHostAdapter(this, vh)); - } - } - + private static final Logger LOGGER = Logger.getLogger(BrokerAdapter.class); + + @SuppressWarnings("serial") + public static final Map<String, Class<?>> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Class<?>>(){{ + put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class); + put(ALERT_THRESHOLD_MESSAGE_COUNT, Long.class); + put(ALERT_THRESHOLD_QUEUE_DEPTH, Long.class); + put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class); + put(ALERT_REPEAT_GAP, Long.class); + put(FLOW_CONTROL_SIZE_BYTES, Long.class); + put(FLOW_CONTROL_RESUME_SIZE_BYTES, Long.class); + put(HOUSEKEEPING_CHECK_PERIOD, Long.class); + + put(DEAD_LETTER_QUEUE_ENABLED, Boolean.class); + put(STATISTICS_REPORTING_RESET_ENABLED, Boolean.class); + + put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); + put(SESSION_COUNT_LIMIT, Integer.class); + put(HEART_BEAT_DELAY, Integer.class); + put(STATISTICS_REPORTING_PERIOD, Integer.class); + + put(ACL_FILE, String.class); + put(NAME, String.class); + put(DEFAULT_VIRTUAL_HOST, String.class); + put(DEFAULT_AUTHENTICATION_PROVIDER, String.class); + + put(KEY_STORE_PATH, String.class); + put(KEY_STORE_PASSWORD, String.class); + put(KEY_STORE_CERT_ALIAS, String.class); + put(TRUST_STORE_PATH, String.class); + put(TRUST_STORE_PASSWORD, String.class); + put(GROUP_FILE, String.class); + }}); + + public static final int DEFAULT_STATISTICS_REPORTING_PERIOD = 0; + public static final boolean DEFAULT_STATISTICS_REPORTING_RESET_ENABLED = false; + public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l; + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l; + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l; + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l; + public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l; + public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false; + public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0; + public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l; + public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l; + public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l; + public static final int DEFAULT_HEART_BEAT_DELAY = 0; + public static final int DEFAULT_SESSION_COUNT_LIMIT = 256; + public static final String DEFAULT_NAME = "QpidBroker"; + private static final String DEFAULT_KEY_STORE_NAME = "defaultKeyStore"; + private static final String DEFAULT_TRUST_STORE_NAME = "defaultTrustStore"; + private static final String DEFAULT_GROUP_PROFIDER_NAME = "defaultGroupProvider"; + + private static final String DUMMY_PASSWORD_MASK = "********"; + + @SuppressWarnings("serial") + private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{ + put(Broker.STATISTICS_REPORTING_PERIOD, DEFAULT_STATISTICS_REPORTING_PERIOD); + put(Broker.STATISTICS_REPORTING_RESET_ENABLED, DEFAULT_STATISTICS_REPORTING_RESET_ENABLED); + put(Broker.ALERT_REPEAT_GAP, DEFAULT_ALERT_REPEAT_GAP); + put(Broker.ALERT_THRESHOLD_MESSAGE_AGE, DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE); + put(Broker.ALERT_THRESHOLD_MESSAGE_COUNT, DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT); + put(Broker.ALERT_THRESHOLD_MESSAGE_SIZE, DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE); + put(Broker.ALERT_THRESHOLD_QUEUE_DEPTH, DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH); + put(Broker.DEAD_LETTER_QUEUE_ENABLED, DEFAULT_DEAD_LETTER_QUEUE_ENABLED); + put(Broker.MAXIMUM_DELIVERY_ATTEMPTS, DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS); + put(Broker.FLOW_CONTROL_RESUME_SIZE_BYTES, DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES); + put(Broker.FLOW_CONTROL_SIZE_BYTES, DEFAULT_FLOW_CONTROL_SIZE_BYTES); + put(Broker.HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD); + put(Broker.HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY); + put(Broker.SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT); + put(Broker.NAME, DEFAULT_NAME); + }}); + + + + + private final StatisticsGatherer _statisticsGatherer; + private final VirtualHostRegistry _virtualHostRegistry; + private final LogRecorder _logRecorder; + private final RootMessageLogger _rootMessageLogger; + private StatisticsAdapter _statistics; + + private final Map<String, VirtualHost> _vhostAdapters = new HashMap<String, VirtualHost>(); + private final Map<Integer, Port> _portAdapters = new HashMap<Integer, Port>(); + private final Map<String, AuthenticationProvider> _authenticationProviders = new HashMap<String, AuthenticationProvider>(); + private final Map<String, GroupProvider> _groupProviders = new HashMap<String, GroupProvider>(); + private final Map<UUID, ConfiguredObject> _plugins = new HashMap<UUID, ConfiguredObject>(); + private final Map<UUID, KeyStore> _keyStores = new HashMap<UUID, KeyStore>(); + private final Map<UUID, TrustStore> _trustStores = new HashMap<UUID, TrustStore>(); + + private final AuthenticationProviderFactory _authenticationProviderFactory; + private AuthenticationProvider _defaultAuthenticationProvider; + + private final PortFactory _portFactory; + private final SecurityManager _securityManager; + private final UUID _defaultKeyStoreId; + private final UUID _defaultTrustStoreId; + + public BrokerAdapter(UUID id, Map<String, Object> attributes, StatisticsGatherer statisticsGatherer, VirtualHostRegistry virtualHostRegistry, + LogRecorder logRecorder, RootMessageLogger rootMessageLogger, AuthenticationProviderFactory authenticationProviderFactory, + PortFactory portFactory, TaskExecutor taskExecutor) + { + super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor); + _statisticsGatherer = statisticsGatherer; + _virtualHostRegistry = virtualHostRegistry; + _logRecorder = logRecorder; + _rootMessageLogger = rootMessageLogger; + _statistics = new StatisticsAdapter(statisticsGatherer); + _authenticationProviderFactory = authenticationProviderFactory; + _portFactory = portFactory; + _securityManager = new SecurityManager((String)getAttribute(ACL_FILE)); + + _defaultKeyStoreId = UUIDGenerator.generateBrokerChildUUID(KeyStore.class.getSimpleName(), DEFAULT_KEY_STORE_NAME); + _defaultTrustStoreId = UUIDGenerator.generateBrokerChildUUID(TrustStore.class.getSimpleName(), DEFAULT_TRUST_STORE_NAME); + createBrokerChildrenFromAttributes(); + } + + /* + * A temporary method to create broker children that can be only configured via broker attributes + */ + private void createBrokerChildrenFromAttributes() + { + String groupFile = (String) getAttribute(GROUP_FILE); + if (groupFile != null) + { + GroupManager groupManager = new FileGroupManager(groupFile); + UUID groupProviderId = UUIDGenerator.generateBrokerChildUUID(GroupProvider.class.getSimpleName(), + DEFAULT_GROUP_PROFIDER_NAME); + GroupProviderAdapter groupProviderAdapter = new GroupProviderAdapter(groupProviderId, groupManager, this); + addGroupProvider(groupProviderAdapter); + } + Map<String, Object> actualAttributes = getActualAttributes(); + String keyStorePath = (String) getAttribute(KEY_STORE_PATH); + if (keyStorePath != null) + { + Map<String, Object> keyStoreAttributes = new HashMap<String, Object>(); + keyStoreAttributes.put(KeyStore.NAME, DEFAULT_KEY_STORE_NAME); + keyStoreAttributes.put(KeyStore.PATH, keyStorePath); + keyStoreAttributes.put(KeyStore.PASSWORD, (String) actualAttributes.get(KEY_STORE_PASSWORD)); + keyStoreAttributes.put(KeyStore.TYPE, java.security.KeyStore.getDefaultType()); + keyStoreAttributes.put(KeyStore.CERTIFICATE_ALIAS, getAttribute(KEY_STORE_CERT_ALIAS)); + keyStoreAttributes.put(KeyStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm()); + KeyStoreAdapter KeyStoreAdapter = new KeyStoreAdapter(_defaultKeyStoreId, this, keyStoreAttributes); + addKeyStore(KeyStoreAdapter); + } + String trustStorePath = (String) getAttribute(TRUST_STORE_PATH); + if (trustStorePath != null) + { + Map<String, Object> trsustStoreAttributes = new HashMap<String, Object>(); + trsustStoreAttributes.put(TrustStore.NAME, DEFAULT_TRUST_STORE_NAME); + trsustStoreAttributes.put(TrustStore.PATH, trustStorePath); + trsustStoreAttributes.put(TrustStore.PASSWORD, (String) actualAttributes.get(TRUST_STORE_PASSWORD)); + trsustStoreAttributes.put(TrustStore.TYPE, java.security.KeyStore.getDefaultType()); + trsustStoreAttributes.put(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm()); + TrustStoreAdapter trustStore = new TrustStoreAdapter(_defaultTrustStoreId, this, trsustStoreAttributes); + addTrustStore(trustStore); } } - public Collection<VirtualHost> getVirtualHosts() { synchronized(_vhostAdapters) @@ -105,81 +234,57 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos } } - private void populatePorts() + + public Collection<Port> getPorts() { synchronized (_portAdapters) { - Map<InetSocketAddress, QpidAcceptor> acceptors = _applicationRegistry.getAcceptors(); - - for(Map.Entry<InetSocketAddress, QpidAcceptor> entry : acceptors.entrySet()) - { - if(!_portAdapters.containsKey(entry.getValue())) - { - _portAdapters.put(entry.getValue(), new PortAdapter(this, entry.getValue(), entry.getKey())); - } - } - if(_applicationRegistry.useHTTPManagement() || _applicationRegistry.useHTTPSManagement()) - { - ArrayList<HTTPPortAdapter> httpPorts = new ArrayList<HTTPPortAdapter>(); - if (_applicationRegistry.useHTTPManagement()) - { - httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPManagementPort())); - } - if (_applicationRegistry.useHTTPSManagement()) - { - httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPSManagementPort(), Protocol.HTTPS, Transport.SSL)); - } - _httpManagementPorts = Collections.unmodifiableCollection(httpPorts); - } + final ArrayList<Port> ports = new ArrayList<Port>(_portAdapters.values()); + return ports; } } - public Collection<Port> getPorts() + public Collection<AuthenticationProvider> getAuthenticationProviders() { - synchronized (_portAdapters) + synchronized (_authenticationProviders) { - final ArrayList<Port> ports = new ArrayList<Port>(_portAdapters.values()); - if(_httpManagementPorts != null) - { - ports.addAll(_httpManagementPorts); - } - return ports; + return new ArrayList<AuthenticationProvider>(_authenticationProviders.values()); } } - private void populateAuthenticationManagers() + public AuthenticationProvider getAuthenticationProviderByName(String authenticationProviderName) { - synchronized (_authManagerAdapters) + Collection<AuthenticationProvider> providers = getAuthenticationProviders(); + for (AuthenticationProvider authenticationProvider : providers) { - IAuthenticationManagerRegistry authenticationManagerRegistry = - _applicationRegistry.getAuthenticationManagerRegistry(); - if(authenticationManagerRegistry != null) + if (authenticationProvider.getName().equals(authenticationProviderName)) { - Map<String, AuthenticationManager> authenticationManagers = - authenticationManagerRegistry.getAvailableAuthenticationManagers(); - - for(Map.Entry<String, AuthenticationManager> entry : authenticationManagers.entrySet()) - { - if(!_authManagerAdapters.containsKey(entry.getValue())) - { - _authManagerAdapters.put(entry.getValue(), - AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this, - entry.getValue())); - } - } + return authenticationProvider; } } + return null; } - public Collection<AuthenticationProvider> getAuthenticationProviders() + @Override + public AuthenticationProvider getDefaultAuthenticationProvider() + { + return _defaultAuthenticationProvider; + } + + public void setDefaultAuthenticationProvider(AuthenticationProvider provider) + { + _defaultAuthenticationProvider = provider; + } + + @Override + public Collection<GroupProvider> getGroupProviders() { - synchronized (_authManagerAdapters) + synchronized (_groupProviders) { - final ArrayList<AuthenticationProvider> authManagers = - new ArrayList<AuthenticationProvider>(_authManagerAdapters.values()); - return authManagers; + final ArrayList<GroupProvider> groupManagers = + new ArrayList<GroupProvider>(_groupProviders.values()); + return groupManagers; } - } public VirtualHost createVirtualHost(final String name, @@ -193,22 +298,29 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos return null; //TODO } - public VirtualHost createVirtualHost(final Map<String, Object> attributes) + private VirtualHost createVirtualHost(final Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException { - return null; //TODO + final VirtualHostAdapter virtualHostAdapter = new VirtualHostAdapter(UUID.randomUUID(), attributes, this, + _statisticsGatherer, getTaskExecutor()); + addVirtualHost(virtualHostAdapter); + virtualHostAdapter.setDesiredState(State.INITIALISING, State.ACTIVE); + return virtualHostAdapter; } - public void deleteVirtualHost(final VirtualHost vhost) - throws AccessControlException, IllegalStateException + private boolean deleteVirtualHost(final VirtualHost vhost) throws AccessControlException, IllegalStateException { - //TODO - throw new UnsupportedOperationException("Not yet implemented"); + synchronized (_vhostAdapters) + { + _vhostAdapters.remove(vhost); + } + vhost.removeChangeListener(this); + return true; } public String getName() { - return _name; + return (String)getAttribute(NAME); } public String setName(final String currentName, final String desiredName) @@ -262,6 +374,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos return _statistics; } + @SuppressWarnings("unchecked") @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { @@ -277,12 +390,30 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos { return (Collection<C>) getAuthenticationProviders(); } + else if(clazz == GroupProvider.class) + { + return (Collection<C>) getGroupProviders(); + } + else if(clazz == KeyStore.class) + { + return (Collection<C>) getKeyStores(); + } + else if(clazz == TrustStore.class) + { + return (Collection<C>) getTrustStores(); + } + else if(clazz == Plugin.class) + { + return (Collection<C>) getPlugins(); + } return Collections.emptySet(); } + //TODO: ACL + @SuppressWarnings("unchecked") @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == VirtualHost.class) { @@ -302,111 +433,107 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos } } - private Port createPort(Map<String, Object> attributes) + private void addPort(Port port) { - // TODO - return null; + synchronized (_portAdapters) + { + int portNumber = port.getPort(); + if(_portAdapters.containsKey(portNumber)) + { + throw new IllegalArgumentException("Cannot add port " + port + " because port number " + portNumber + " already configured"); + } + _portAdapters.put(portNumber, port); + } + port.addChangeListener(this); } - private AuthenticationProvider createAuthenticationProvider(Map<String,Object> attributes) + private Port createPort(Map<String, Object> attributes) { - // TODO - return null; + Port port = _portFactory.createPort(UUID.randomUUID(), this, attributes); + addPort(port); + return port; } + private AuthenticationProvider createAuthenticationProvider(Map<String, Object> attributes) + { + // it's cheap to create the groupPrincipalAccessor on the fly + GroupPrincipalAccessor groupPrincipalAccessor = new GroupPrincipalAccessor(_groupProviders.values()); + + AuthenticationProvider authenticationProvider = _authenticationProviderFactory.create(UUID.randomUUID(), this, attributes, groupPrincipalAccessor); + addAuthenticationProvider(authenticationProvider); + return authenticationProvider; + } - public void virtualHostRegistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost) + /** + * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists + */ + private void addAuthenticationProvider(AuthenticationProvider authenticationProvider) { - VirtualHostAdapter adapter = null; - synchronized (_vhostAdapters) + String name = authenticationProvider.getName(); + synchronized (_authenticationProviders) { - if(!_vhostAdapters.containsKey(virtualHost)) + if(_authenticationProviders.containsKey(name)) { - adapter = new VirtualHostAdapter(this, virtualHost); - _vhostAdapters.put(virtualHost, adapter); + throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with name " + name + " already exists"); } + _authenticationProviders.put(name, authenticationProvider); } - if(adapter != null) - { - childAdded(adapter); - } + authenticationProvider.addChangeListener(this); } - public void virtualHostUnregistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost) + private void addGroupProvider(GroupProvider groupProvider) { - VirtualHostAdapter adapter = null; - - synchronized (_vhostAdapters) - { - adapter = _vhostAdapters.remove(virtualHost); - } - if(adapter != null) + synchronized (_groupProviders) { - childRemoved(adapter); + String name = groupProvider.getName(); + if(_groupProviders.containsKey(name)) + { + throw new IllegalConfigurationException("Cannot add GroupProvider because one with name " + name + " already exists"); + } + _groupProviders.put(name, groupProvider); } + groupProvider.addChangeListener(this); } - @Override - public void authenticationManagerRegistered(AuthenticationManager authenticationManager) + private boolean deleteGroupProvider(GroupProvider object) + { + throw new UnsupportedOperationException("Not implemented yet!"); + } + + private void addKeyStore(KeyStore keyStore) { - AuthenticationProviderAdapter adapter = null; - synchronized (_authManagerAdapters) + synchronized (_keyStores) { - if(!_authManagerAdapters.containsKey(authenticationManager)) + if(_keyStores.containsKey(keyStore.getId())) { - adapter = - AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this, authenticationManager); - _authManagerAdapters.put(authenticationManager, adapter); + throw new IllegalConfigurationException("Cannot add KeyStore because one with id " + keyStore.getId() + " already exists"); } + _keyStores.put(keyStore.getId(), keyStore); } - if(adapter != null) - { - childAdded(adapter); - } + keyStore.addChangeListener(this); } - @Override - public void authenticationManagerUnregistered(AuthenticationManager authenticationManager) + private boolean deleteKeyStore(KeyStore object) { - AuthenticationProviderAdapter adapter; - synchronized (_authManagerAdapters) - { - adapter = _authManagerAdapters.remove(authenticationManager); - } - if(adapter != null) - { - childRemoved(adapter); - } + throw new UnsupportedOperationException("Not implemented yet!"); } - - @Override - public void bound(QpidAcceptor acceptor, InetSocketAddress bindAddress) + private void addTrustStore(TrustStore trustStore) { - synchronized (_portAdapters) + synchronized (_trustStores) { - if(!_portAdapters.containsKey(acceptor)) + if(_trustStores.containsKey(trustStore.getId())) { - PortAdapter adapter = new PortAdapter(this, acceptor, bindAddress); - _portAdapters.put(acceptor, adapter); - childAdded(adapter); + throw new IllegalConfigurationException("Cannot add TrustStore because one with id " + trustStore.getId() + " already exists"); } + _trustStores.put(trustStore.getId(), trustStore); } + trustStore.addChangeListener(this); } - @Override - public void unbound(QpidAcceptor acceptor) + private boolean deleteTrustStore(TrustStore object) { - PortAdapter adapter = null; - - synchronized (_portAdapters) - { - adapter = _portAdapters.remove(acceptor); - } - if(adapter != null) - { - childRemoved(adapter); - } + throw new UnsupportedOperationException("Not implemented yet!"); } @Override @@ -422,10 +549,6 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos { return getId(); } - else if(NAME.equals(name)) - { - return getName(); - } else if(STATE.equals(name)) { return State.ACTIVE; @@ -481,14 +604,316 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos { // TODO } + else if (DEFAULT_AUTHENTICATION_PROVIDER.equals(name)) + { + return _defaultAuthenticationProvider == null ? null : _defaultAuthenticationProvider.getName(); + } + else if (KEY_STORE_PASSWORD.equals(name)) + { + return DUMMY_PASSWORD_MASK; + } + else if (TRUST_STORE_PASSWORD.equals(name)) + { + return DUMMY_PASSWORD_MASK; + } + return super.getAttribute(name); + } - return super.getAttribute(name); //TODO - Implement. + private boolean deletePort(Port portAdapter) + { + Port removedPort = null; + synchronized (_portAdapters) + { + removedPort = _portAdapters.remove(portAdapter.getPort()); + } + return removedPort != null; + } + + private boolean deleteAuthenticationProvider(AuthenticationProvider authenticationProvider) + { + AuthenticationProvider removedAuthenticationProvider = null; + synchronized (_authenticationProviders) + { + removedAuthenticationProvider = _authenticationProviders.remove(authenticationProvider.getName()); + } + return removedAuthenticationProvider != null; + } + + private void addVirtualHost(VirtualHost virtualHost) + { + synchronized (_vhostAdapters) + { + String name = virtualHost.getName(); + if (_vhostAdapters.containsKey(name)) + { + throw new IllegalConfigurationException("Virtual host with name " + name + " is already specified!"); + } + _vhostAdapters.put(name, virtualHost); + } + virtualHost.addChangeListener(this); } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException + public boolean setState(State currentState, State desiredState) + { + if (desiredState == State.ACTIVE) + { + changeState(_groupProviders, currentState, State.ACTIVE, false); + changeState(_authenticationProviders, currentState, State.ACTIVE, false); + + CurrentActor.set(new BrokerActor(getRootMessageLogger())); + try + { + changeState(_vhostAdapters, currentState, State.ACTIVE, false); + } + finally + { + CurrentActor.remove(); + } + + changeState(_portAdapters, currentState,State.ACTIVE, false); + changeState(_plugins, currentState,State.ACTIVE, false); + return true; + } + else if (desiredState == State.STOPPED) + { + changeState(_plugins, currentState,State.STOPPED, true); + changeState(_portAdapters, currentState, State.STOPPED, true); + changeState(_vhostAdapters,currentState, State.STOPPED, true); + changeState(_authenticationProviders, currentState, State.STOPPED, true); + changeState(_groupProviders, currentState, State.STOPPED, true); + return true; + } + return false; + } + + private void changeState(Map<?, ? extends ConfiguredObject> configuredObjectMap, State currentState, State desiredState, boolean swallowException) + { + synchronized(configuredObjectMap) + { + Collection<? extends ConfiguredObject> adapters = configuredObjectMap.values(); + for (ConfiguredObject configuredObject : adapters) + { + if (State.ACTIVE.equals(desiredState) && State.QUIESCED.equals(configuredObject.getActualState())) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(configuredObject + " cannot be activated as it is " +State.QUIESCED); + } + continue; + } + try + { + configuredObject.setDesiredState(currentState, desiredState); + } + catch(RuntimeException e) + { + if (swallowException) + { + LOGGER.error("Failed to stop " + configuredObject, e); + } + else + { + throw e; + } + } + } + } + } + + @Override + public void stateChanged(ConfiguredObject object, State oldState, State newState) + { + if(newState == State.DELETED) + { + boolean childDeleted = false; + if(object instanceof AuthenticationProvider) + { + childDeleted = deleteAuthenticationProvider((AuthenticationProvider)object); + } + else if(object instanceof Port) + { + childDeleted = deletePort((Port)object); + } + else if(object instanceof VirtualHost) + { + childDeleted = deleteVirtualHost((VirtualHost)object); + } + else if(object instanceof GroupProvider) + { + childDeleted = deleteGroupProvider((GroupProvider)object); + } + else if(object instanceof KeyStore) + { + childDeleted = deleteKeyStore((KeyStore)object); + } + else if(object instanceof TrustStore) + { + childDeleted = deleteTrustStore((TrustStore)object); + } + if(childDeleted) + { + childRemoved(object); + } + } + } + + @Override + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) + { + // no-op + } + + private void addPlugin(ConfiguredObject plugin) + { + synchronized(_plugins) + { + if (_plugins.containsKey(plugin.getId())) + { + throw new IllegalConfigurationException("Plugin with id '" + plugin.getId() + "' is already registered!"); + } + _plugins.put(plugin.getId(), plugin); + } + plugin.addChangeListener(this); + } + + + private Collection<ConfiguredObject> getPlugins() + { + synchronized(_plugins) + { + return Collections.unmodifiableCollection(_plugins.values()); + } + } + + public void recoverChild(ConfiguredObject object) + { + if(object instanceof AuthenticationProvider) + { + addAuthenticationProvider((AuthenticationProvider)object); + } + else if(object instanceof Port) + { + addPort((Port)object); + } + else if(object instanceof VirtualHost) + { + addVirtualHost((VirtualHost)object); + } + else if(object instanceof GroupProvider) + { + addGroupProvider((GroupProvider)object); + } + else if(object instanceof KeyStore) + { + addKeyStore((KeyStore)object); + } + else if(object instanceof TrustStore) + { + addTrustStore((TrustStore)object); + } + else if(object instanceof Plugin) + { + addPlugin(object); + } + else + { + throw new IllegalArgumentException("Attempted to recover unexpected type of configured object: " + object.getClass().getName()); + } + } + + @Override + public RootMessageLogger getRootMessageLogger() + { + return _rootMessageLogger; + } + + @Override + public SecurityManager getSecurityManager() + { + return _securityManager; + } + + @Override + public LogRecorder getLogRecorder() + { + return _logRecorder; + } + + @Override + public VirtualHost findVirtualHostByName(String name) + { + return _vhostAdapters.get(name); + } + + @Override + public SubjectCreator getSubjectCreator(SocketAddress localAddress) + { + InetSocketAddress inetSocketAddress = (InetSocketAddress)localAddress; + AuthenticationProvider provider = _defaultAuthenticationProvider; + Collection<Port> ports = getPorts(); + for (Port p : ports) + { + if (inetSocketAddress.getPort() == p.getPort()) + { + provider = p.getAuthenticationProvider(); + break; + } + } + return provider.getSubjectCreator(); + } + + @Override + public Collection<KeyStore> getKeyStores() + { + synchronized(_trustStores) + { + return Collections.unmodifiableCollection(_keyStores.values()); + } + } + + @Override + public Collection<TrustStore> getTrustStores() + { + synchronized(_trustStores) + { + return Collections.unmodifiableCollection(_trustStores.values()); + } + } + + @Override + public VirtualHostRegistry getVirtualHostRegistry() + { + return _virtualHostRegistry; + } + + @Override + public KeyStore getDefaultKeyStore() + { + return _keyStores.get(_defaultKeyStoreId); + } + + @Override + public TrustStore getDefaultTrustStore() + { + return _trustStores.get(_defaultTrustStoreId); + } + + @Override + public TaskExecutor getTaskExecutor() { - return super.setAttribute(name, expected, desired); //TODO - Implement. + return super.getTaskExecutor(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 5439f6a560..84f99e1f17 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.stats.StatisticsGatherer; @@ -50,9 +51,9 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection new HashMap<AMQSessionModel, SessionAdapter>(); private final Statistics _statistics; - public ConnectionAdapter(final AMQConnectionModel conn) + public ConnectionAdapter(final AMQConnectionModel conn, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateRandomUUID()); + super(UUIDGenerator.generateRandomUUID(), taskExecutor); _connection = conn; _statistics = new ConnectionStatisticsAdapter(conn); } @@ -74,7 +75,7 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection { if(!_sessionAdapters.containsKey(session)) { - _sessionAdapters.put(session, new SessionAdapter(session)); + _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor())); } } return new ArrayList<Session>(_sessionAdapters.values()); @@ -199,52 +200,6 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection } @Override - public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException - { - if(name.equals(CLIENT_ID)) - { - - } - else if(name.equals(CLIENT_VERSION)) - { - - } - else if(name.equals(INCOMING)) - { - - } - else if(name.equals(LOCAL_ADDRESS)) - { - - } - else if(name.equals(PRINCIPAL)) - { - - } - else if(name.equals(PROPERTIES)) - { - - } - else if(name.equals(REMOTE_ADDRESS)) - { - - } - else if(name.equals(REMOTE_PROCESS_NAME)) - { - - } - else if(name.equals(REMOTE_PROCESS_PID)) - { - - } - else if(name.equals(SESSION_COUNT_LIMIT)) - { - - } - return super.setAttribute(name, expected, desired); - } - - @Override public Collection<String> getAttributeNames() { final HashSet<String> attrNames = new HashSet<String>(super.getAttributeNames()); @@ -270,7 +225,8 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection } } - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + @Override + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == Session.class) { @@ -310,4 +266,11 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection return super.getStatistic(name); } } + + @Override + protected boolean setState(State currentState, State desiredState) + { + // TODO: add state management + return false; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index 031d518670..e6d3fab2f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -45,7 +45,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer queueAdapter.getName(), subscription.getSessionModel().getConnectionModel().getRemoteAddressString(), String.valueOf(subscription.getSessionModel().getChannelId()), - subscription.getConsumerName())); + subscription.getConsumerName()), queueAdapter.getTaskExecutor()); _subscription = subscription; _queue = queueAdapter; _statistics = new ConsumerStatistics(); @@ -108,13 +108,6 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer } @Override - public Object setAttribute(final String name, final Object expected, final Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - } - - @Override public Object getAttribute(final String name) { if(ID.equals(name)) @@ -222,4 +215,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer return null; // TODO - Implement } } + + @Override + protected boolean setState(State currentState, State desiredState) + { + // TODO : Add state management + return false; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index df0f29fbc3..5d5f3f7378 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -33,16 +33,15 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Publisher; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener @@ -57,7 +56,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa public ExchangeAdapter(final VirtualHostAdapter virtualHostAdapter, final org.apache.qpid.server.exchange.Exchange exchange) { - super(exchange.getId()); + super(exchange.getId(), virtualHostAdapter.getTaskExecutor()); _statistics = new ExchangeStatistics(); _vhost = virtualHostAdapter; _exchange = exchange; @@ -113,8 +112,8 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa throws AccessControlException, IllegalStateException { attributes = new HashMap<String, Object>(attributes); - String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, ""); - Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP); + String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, ""); + Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap()); attributes.remove(org.apache.qpid.server.model.Binding.NAME); attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS); @@ -257,7 +256,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == org.apache.qpid.server.model.Binding.class) { @@ -369,28 +368,20 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement - } - - @Override public Collection<String> getAttributeNames() { return AVAILABLE_ATTRIBUTES; } @Override - public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, - AccessControlException + protected boolean setState(State currentState, State desiredState) { if (desiredState == State.DELETED) { delete(); - return State.DELETED; + return true; } - return super.setDesiredState(currentState, desiredState); + return false; } private class ExchangeStatistics implements Statistics diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java new file mode 100644 index 0000000000..0fa834bc28 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import java.security.AccessControlException; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Group; +import org.apache.qpid.server.model.GroupMember; +import org.apache.qpid.server.model.GroupProvider; +import org.apache.qpid.server.model.IllegalStateTransitionException; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.security.group.GroupManager; +import org.apache.qpid.server.security.SecurityManager; + +public class GroupProviderAdapter extends AbstractAdapter implements + GroupProvider +{ + private final GroupManager _groupManager; + private final Broker _broker; + public GroupProviderAdapter(UUID id, GroupManager groupManager, Broker broker) + { + super(id, broker.getTaskExecutor()); + + if (groupManager == null) + { + throw new IllegalArgumentException("GroupManager must not be null"); + } + _groupManager = groupManager; + _broker = broker; + addParent(Broker.class, broker); + } + + @Override + public String getName() + { + return _groupManager.getClass().getSimpleName(); + } + + @Override + public String setName(String currentName, String desiredName) + throws IllegalStateException, AccessControlException + { + return null; + } + + @Override + public State getActualState() + { + return null; + } + + @Override + public boolean isDurable() + { + return true; + } + + @Override + public void setDurable(boolean durable) throws IllegalStateException, + AccessControlException, IllegalArgumentException + { + } + + @Override + public LifetimePolicy getLifetimePolicy() + { + return LifetimePolicy.PERMANENT; + } + + @Override + public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, + LifetimePolicy desired) throws IllegalStateException, + AccessControlException, IllegalArgumentException + { + return null; + } + + @Override + public long getTimeToLive() + { + return 0; + } + + @Override + public long setTimeToLive(long expected, long desired) + throws IllegalStateException, AccessControlException, + IllegalArgumentException + { + return 0; + } + + @Override + public Statistics getStatistics() + { + return NoStatistics.getInstance(); + } + + @Override + public Collection<String> getAttributeNames() + { + return GroupProvider.AVAILABLE_ATTRIBUTES; + } + + @Override + public Object getAttribute(String name) + { + if (TYPE.equals(name)) + { + return getName(); + } + else if (CREATED.equals(name)) + { + // TODO + } + else if (DURABLE.equals(name)) + { + return true; + } + else if (ID.equals(name)) + { + return getId(); + } + else if (LIFETIME_POLICY.equals(name)) + { + return LifetimePolicy.PERMANENT; + } + else if (NAME.equals(name)) + { + return getName(); + } + else if (STATE.equals(name)) + { + return State.ACTIVE; // TODO + } + else if (TIME_TO_LIVE.equals(name)) + { + // TODO + } + else if (UPDATED.equals(name)) + { + // TODO + } + return super.getAttribute(name); + } + + @Override + public <C extends ConfiguredObject> C addChild(Class<C> childClass, + Map<String, Object> attributes, ConfiguredObject... otherParents) + { + if (childClass == Group.class) + { + String groupName = (String) attributes.get(Group.NAME); + + if (getSecurityManager().authoriseGroupOperation(Operation.CREATE, groupName)) + { + _groupManager.createGroup(groupName); + return (C) new GroupAdapter(groupName, getTaskExecutor()); + } + else + { + throw new AccessControlException("Do not have permission" + + " to create new group"); + } + } + + throw new IllegalArgumentException( + "This group provider does not support creating children of type: " + + childClass); + } + + @SuppressWarnings("unchecked") + @Override + public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) + { + if (clazz == Group.class) + { + Set<Principal> groups = _groupManager.getGroupPrincipals(); + Collection<Group> principals = new ArrayList<Group>(groups.size()); + for (Principal group : groups) + { + principals.add(new GroupAdapter(group.getName(), getTaskExecutor())); + } + return (Collection<C>) Collections + .unmodifiableCollection(principals); + } + else + { + return null; + } + } + + private SecurityManager getSecurityManager() + { + return _broker.getSecurityManager(); + } + + private class GroupAdapter extends AbstractAdapter implements Group + { + private final String _group; + + public GroupAdapter(String group, TaskExecutor taskExecutor) + { + super(UUIDGenerator.generateGroupUUID(GroupProviderAdapter.this.getName(), group), taskExecutor); + _group = group; + + } + + @Override + public String getName() + { + return _group; + } + + @Override + public String setName(String currentName, String desiredName) + throws IllegalStateException, AccessControlException + { + throw new IllegalStateException("Names cannot be updated"); + } + + @Override + public State getActualState() + { + return State.ACTIVE; + } + + @Override + public boolean isDurable() + { + return true; + } + + @Override + public void setDurable(boolean durable) throws IllegalStateException, + AccessControlException, IllegalArgumentException + { + throw new IllegalStateException("Durability cannot be updated"); + } + + @Override + public LifetimePolicy getLifetimePolicy() + { + return LifetimePolicy.PERMANENT; + } + + @Override + public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, + LifetimePolicy desired) throws IllegalStateException, + AccessControlException, IllegalArgumentException + { + throw new IllegalStateException("LifetimePolicy cannot be updated"); + } + + @Override + public long getTimeToLive() + { + return 0; + } + + @Override + public long setTimeToLive(long expected, long desired) + throws IllegalStateException, AccessControlException, + IllegalArgumentException + { + throw new IllegalStateException("ttl cannot be updated"); + } + + @Override + public Statistics getStatistics() + { + return NoStatistics.getInstance(); + } + + @Override + public <C extends ConfiguredObject> Collection<C> getChildren( + Class<C> clazz) + { + if (clazz == GroupMember.class) + { + Set<Principal> usersInGroup = _groupManager + .getUserPrincipalsForGroup(_group); + Collection<GroupMember> members = new ArrayList<GroupMember>(); + for (Principal principal : usersInGroup) + { + members.add(new GroupMemberAdapter(principal.getName(), getTaskExecutor())); + } + return (Collection<C>) Collections + .unmodifiableCollection(members); + } + else + { + return null; + } + + } + + @Override + public <C extends ConfiguredObject> C addChild(Class<C> childClass, + Map<String, Object> attributes, + ConfiguredObject... otherParents) + { + if (childClass == GroupMember.class) + { + String memberName = (String) attributes.get(GroupMember.NAME); + + if (getSecurityManager().authoriseGroupOperation(Operation.UPDATE, _group)) + { + _groupManager.addUserToGroup(memberName, _group); + return (C) new GroupMemberAdapter(memberName, getTaskExecutor()); + } + else + { + throw new AccessControlException("Do not have permission" + + " to add new group member"); + } + } + + throw new IllegalArgumentException( + "This group provider does not support creating children of type: " + + childClass); + } + + @Override + public Collection<String> getAttributeNames() + { + return Group.AVAILABLE_ATTRIBUTES; + } + + @Override + public Object getAttribute(String name) + { + if (ID.equals(name)) + { + return getId(); + } + else if (NAME.equals(name)) + { + return getName(); + } + return super.getAttribute(name); + } + + @Override + protected boolean setState(State currentState, State desiredState) + throws IllegalStateTransitionException, AccessControlException + { + if (desiredState == State.DELETED) + { + if (getSecurityManager().authoriseGroupOperation(Operation.DELETE, _group)) + { + _groupManager.removeGroup(_group); + return true; + } + else + { + throw new AccessControlException("Do not have permission to delete group"); + } + } + + return false; + } + + private class GroupMemberAdapter extends AbstractAdapter implements + GroupMember + { + private String _memberName; + + public GroupMemberAdapter(String memberName, TaskExecutor taskExecutor) + { + super(UUIDGenerator.generateGroupMemberUUID(GroupProviderAdapter.this.getName(), _group, memberName), taskExecutor); + _memberName = memberName; + } + + @Override + public Collection<String> getAttributeNames() + { + return GroupMember.AVAILABLE_ATTRIBUTES; + } + + @Override + public Object getAttribute(String name) + { + if (ID.equals(name)) + { + return getId(); + } + else if (NAME.equals(name)) + { + return getName(); + } + return super.getAttribute(name); + } + + @Override + public String getName() + { + return _memberName; + } + + @Override + public String setName(String currentName, String desiredName) + throws IllegalStateException, AccessControlException + { + return null; + } + + @Override + public State getActualState() + { + return null; + } + + @Override + public boolean isDurable() + { + return false; + } + + @Override + public void setDurable(boolean durable) + throws IllegalStateException, AccessControlException, + IllegalArgumentException + { + } + + @Override + public LifetimePolicy getLifetimePolicy() + { + return null; + } + + @Override + public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, + LifetimePolicy desired) throws IllegalStateException, + AccessControlException, IllegalArgumentException + { + return null; + } + + @Override + public long getTimeToLive() + { + return 0; + } + + @Override + public long setTimeToLive(long expected, long desired) + throws IllegalStateException, AccessControlException, + IllegalArgumentException + { + return 0; + } + + @Override + public Statistics getStatistics() + { + return NoStatistics.getInstance(); + } + + @Override + public <C extends ConfiguredObject> Collection<C> getChildren( + Class<C> clazz) + { + return null; + } + + @Override + public <C extends ConfiguredObject> C createChild( + Class<C> childClass, Map<String, Object> attributes, + ConfiguredObject... otherParents) + { + return null; + } + + @Override + protected boolean setState(State currentState, State desiredState) + throws IllegalStateTransitionException, + AccessControlException + { + if (desiredState == State.DELETED) + { + if (getSecurityManager().authoriseGroupOperation(Operation.UPDATE, _group)) + { + _groupManager.removeUserFromGroup(_memberName, _group); + return true; + } + else + { + throw new AccessControlException("Do not have permission to remove group member"); + } + } + return false; + } + + } + } + + @Override + protected boolean setState(State currentState, State desiredState) + { + if (desiredState == State.ACTIVE) + { + return true; + } + else if (desiredState == State.STOPPED) + { + return true; + } + // TODO: DELETE state is ignored for now + // in case if we need to delete group provider, then we need AuthenticationProvider to be a change listener of it + // in order to remove deleted group provider from its group provider list + return false; + } + + public Set<Principal> getGroupPrincipalsForUser(String username) + { + return _groupManager.getGroupPrincipalsForUser(username); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/HTTPPortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/HTTPPortAdapter.java deleted file mode 100644 index 823d27160b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/HTTPPortAdapter.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.model.adapter; - -import java.security.AccessControlException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.Connection; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Protocol; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.Statistics; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHostAlias; - -public class HTTPPortAdapter extends AbstractAdapter implements Port -{ - private final BrokerAdapter _broker; - private final int _port; - private final Protocol _protocol; - private final Transport _transport; - - public HTTPPortAdapter(BrokerAdapter brokerAdapter, int port) - { - this(brokerAdapter, port, Protocol.HTTP, Transport.TCP); - } - - public HTTPPortAdapter(BrokerAdapter brokerAdapter, int port, Protocol protocol, Transport transport) - { - super(UUIDGenerator.generateRandomUUID()); - _broker = brokerAdapter; - _port = port; - _protocol = protocol; - _transport = transport; - } - - @Override - public String getBindingAddress() - { - return "0.0.0.0"; - } - - @Override - public int getPort() - { - return _port; - } - - @Override - public Collection<Transport> getTransports() - { - return Collections.singleton(_transport); - } - - @Override - public void addTransport(Transport transport) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); // TODO - Implement - } - - @Override - public Transport removeTransport(Transport transport) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); // TODO - Implement - } - - @Override - public Collection<Protocol> getProtocols() - { - return Collections.singleton(_protocol); - } - - @Override - public void addProtocol(Protocol protocol) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); // TODO - Implement - } - - @Override - public Protocol removeProtocol(Protocol protocol) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); // TODO - Implement - } - - @Override - public Collection<VirtualHostAlias> getVirtualHostBindings() - { - return Collections.emptySet(); - } - - @Override - public Collection<Connection> getConnections() - { - return Collections.emptySet(); // TODO - Implement - } - - @Override - public String getName() - { - return getBindingAddress() + ":" + getPort(); // TODO - Implement - } - - @Override - public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException - { - throw new IllegalStateException(); // TODO - Implement - } - - @Override - public State getActualState() - { - return State.ACTIVE; - } - - @Override - public boolean isDurable() - { - return false; // TODO - Implement - } - - @Override - public void setDurable(boolean durable) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); - } - - @Override - public LifetimePolicy getLifetimePolicy() - { - return LifetimePolicy.PERMANENT; - } - - @Override - public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); // TODO - Implement - } - - @Override - public long getTimeToLive() - { - return 0; // TODO - Implement - } - - @Override - public long setTimeToLive(long expected, long desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); // TODO - Implement - } - - @Override - public Statistics getStatistics() - { - return NoStatistics.getInstance(); - } - - @Override - public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) - { - if(clazz == Connection.class) - { - return (Collection<C>) getConnections(); - } - else - { - return Collections.emptySet(); - } - } - - @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) - { - throw new UnsupportedOperationException(); - } - - @Override - public Object getAttribute(String name) - { - if(ID.equals(name)) - { - return getId(); - } - else if(NAME.equals(name)) - { - return getName(); - } - else if(STATE.equals(name)) - { - return getActualState(); - } - else if(DURABLE.equals(name)) - { - return isDurable(); - } - else if(LIFETIME_POLICY.equals(name)) - { - return getLifetimePolicy(); - } - else if(TIME_TO_LIVE.equals(name)) - { - return getTimeToLive(); - } - else if(CREATED.equals(name)) - { - - } - else if(UPDATED.equals(name)) - { - - } - else if(BINDING_ADDRESS.equals(name)) - { - return getBindingAddress(); - } - else if(PORT.equals(name)) - { - return getPort(); - } - else if(PROTOCOLS.equals(name)) - { - return getProtocols(); - } - else if(TRANSPORTS.equals(name)) - { - return getTransports(); - } - - return super.getAttribute(name); //TODO - Implement - } - - @Override - public Collection<String> getAttributeNames() - { - return AVAILABLE_ATTRIBUTES; - } - - @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java new file mode 100644 index 0000000000..113d895e62 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.KeyStore; + +public class KeyStoreAdapter extends AbstractKeyStoreAdapter implements KeyStore +{ + + public KeyStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes) + { + super(id, broker, attributes); + if (attributes.get(CERTIFICATE_ALIAS) != null) + { + changeAttribute(CERTIFICATE_ALIAS, null, attributes.get(CERTIFICATE_ALIAS)); + } + } + + @Override + public Collection<String> getAttributeNames() + { + return AVAILABLE_ATTRIBUTES; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java index 7653fcc9b9..c4a531c923 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java @@ -21,7 +21,16 @@ package org.apache.qpid.server.model.adapter; +import java.security.AccessControlException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.LifetimePolicy; @@ -30,119 +39,82 @@ import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.transport.QpidAcceptor; - -import java.net.InetSocketAddress; -import java.security.AccessControlException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import org.apache.qpid.server.configuration.updater.TaskExecutor; public class PortAdapter extends AbstractAdapter implements Port { - private final BrokerAdapter _broker; - private final QpidAcceptor _acceptor; - private final InetSocketAddress _address; - private final Collection<Protocol> _protocols; - - public PortAdapter(BrokerAdapter brokerAdapter, QpidAcceptor acceptor, InetSocketAddress address) - { - super(UUIDGenerator.generateRandomUUID()); - _broker = brokerAdapter; - _acceptor = acceptor; - _address = address; - List<Protocol> protocols = new ArrayList<Protocol>(); + private final Broker _broker; + private AuthenticationProvider _authenticationProvider; - for(AmqpProtocolVersion pv : _acceptor.getSupported()) - { - switch(pv) - { - case v0_8: - protocols.add(Protocol.AMQP_0_8); - break; - case v0_9: - protocols.add(Protocol.AMQP_0_9); - break; - case v0_9_1: - protocols.add(Protocol.AMQP_0_9_1); - break; - case v0_10: - protocols.add(Protocol.AMQP_0_10); - break; - case v1_0_0: - protocols.add(Protocol.AMQP_1_0); - break; - } - } + /* + * TODO register PortAceptor as a listener. For supporting multiple + * protocols on the same port we need to introduce a special entity like + * PortAceptor which will be responsible for port binding/unbinding + */ + public PortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaults, TaskExecutor taskExecutor) + { + super(id, defaults, attributes, taskExecutor); + _broker = broker; - _protocols = Collections.unmodifiableCollection(protocols); + addParent(Broker.class, broker); } @Override public String getBindingAddress() { - return _address.getHostName(); + return (String)getAttribute(BINDING_ADDRESS); } @Override public int getPort() { - return _address.getPort(); + return (Integer)getAttribute(PORT); } + @SuppressWarnings("unchecked") @Override public Collection<Transport> getTransports() { - switch (_acceptor.getTransport()) - { - case TCP: - return Collections.singleton(Transport.TCP); - case SSL: - return Collections.singleton(Transport.SSL); - } - - return null; // TODO - Implement + return (Collection<Transport>)getAttribute(TRANSPORTS); } @Override public void addTransport(Transport transport) throws IllegalStateException, AccessControlException, IllegalArgumentException { - throw new IllegalStateException(); // TODO - Implement + throw new IllegalStateException(); } @Override public Transport removeTransport(Transport transport) throws IllegalStateException, AccessControlException, IllegalArgumentException { - throw new IllegalStateException(); // TODO - Implement + throw new IllegalStateException(); } + @SuppressWarnings("unchecked") @Override public Collection<Protocol> getProtocols() { - return _protocols; + return (Collection<Protocol>)getAttribute(PROTOCOLS); } @Override public void addProtocol(Protocol protocol) throws IllegalStateException, AccessControlException, IllegalArgumentException { - throw new IllegalStateException(); // TODO - Implement + throw new IllegalStateException(); } @Override public Protocol removeProtocol(Protocol protocol) throws IllegalStateException, AccessControlException, IllegalArgumentException { - throw new IllegalStateException(); // TODO - Implement + throw new IllegalStateException(); } @Override @@ -165,31 +137,36 @@ public class PortAdapter extends AbstractAdapter implements Port @Override public Collection<Connection> getConnections() { - return null; // TODO - Implement + return null; } @Override public String getName() { - return getBindingAddress() + ":" + getPort(); // TODO - Implement + return (String)getAttribute(NAME); } @Override public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException { - throw new IllegalStateException(); // TODO - Implement + throw new IllegalStateException(); } @Override public State getActualState() { - return State.ACTIVE; + State state = (State)super.getAttribute(STATE); + if (state == null) + { + return State.ACTIVE; + } + return state; } @Override public boolean isDurable() { - return false; // TODO - Implement + return false; } @Override @@ -209,20 +186,20 @@ public class PortAdapter extends AbstractAdapter implements Port public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { - throw new IllegalStateException(); // TODO - Implement + throw new IllegalStateException(); } @Override public long getTimeToLive() { - return 0; // TODO - Implement + return 0; } @Override public long setTimeToLive(long expected, long desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { - throw new IllegalStateException(); // TODO - Implement + throw new IllegalStateException(); } @Override @@ -257,10 +234,6 @@ public class PortAdapter extends AbstractAdapter implements Port { return getId(); } - else if(NAME.equals(name)) - { - return getName(); - } else if(STATE.equals(name)) { return getActualState(); @@ -285,36 +258,54 @@ public class PortAdapter extends AbstractAdapter implements Port { } - else if(BINDING_ADDRESS.equals(name)) - { - return getBindingAddress(); - } - else if(PORT.equals(name)) + return super.getAttribute(name); + } + + @Override + public Collection<String> getAttributeNames() + { + return AVAILABLE_ATTRIBUTES; + } + + @Override + public boolean setState(State currentState, State desiredState) + { + if (desiredState == State.DELETED) { - return getPort(); + return true; } - else if(PROTOCOLS.equals(name)) + else if (desiredState == State.ACTIVE) { - return getProtocols(); + onActivate(); + return true; } - else if(TRANSPORTS.equals(name)) + else if (desiredState == State.STOPPED) { - return getTransports(); + onStop(); + return true; } + return false; + } - return super.getAttribute(name); //TODO - Implement + protected void onActivate() + { + // no-op: expected to be overridden by subclass } - @Override - public Collection<String> getAttributeNames() + protected void onStop() { - return AVAILABLE_ATTRIBUTES; + // no-op: expected to be overridden by subclass } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException + public AuthenticationProvider getAuthenticationProvider() + { + return _authenticationProvider; + } + + public void setAuthenticationProvider(AuthenticationProvider authenticationProvider) { - return super.setAttribute(name, expected, desired); //TODO - Implement + _authenticationProvider = authenticationProvider; } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java new file mode 100644 index 0000000000..b7441b9f3b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java @@ -0,0 +1,222 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.Protocol.ProtocolType; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.util.MapValueConverter; + +public class PortFactory +{ + public static final int DEFAULT_AMQP_SEND_BUFFER_SIZE = 262144; + public static final int DEFAULT_AMQP_RECEIVE_BUFFER_SIZE = 262144; + public static final boolean DEFAULT_AMQP_NEED_CLIENT_AUTH = false; + public static final boolean DEFAULT_AMQP_WANT_CLIENT_AUTH = false; + public static final boolean DEFAULT_AMQP_TCP_NO_DELAY = true; + public static final String DEFAULT_AMQP_BINDING = "*"; + public static final Transport DEFAULT_TRANSPORT = Transport.TCP; + + private final Collection<Protocol> _defaultProtocols; + + public PortFactory() + { + Set<Protocol> defaultProtocols = EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1, + Protocol.AMQP_0_10, Protocol.AMQP_1_0); + String excludedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES); + if (excludedProtocols != null) + { + String[] excludes = excludedProtocols.split(","); + for (String exclude : excludes) + { + Protocol protocol = Protocol.valueOf(exclude); + defaultProtocols.remove(protocol); + } + } + String includedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES); + if (includedProtocols != null) + { + String[] includes = includedProtocols.split(","); + for (String include : includes) + { + Protocol protocol = Protocol.valueOf(include); + defaultProtocols.add(protocol); + } + } + _defaultProtocols = Collections.unmodifiableCollection(defaultProtocols); + } + + public Port createPort(UUID id, Broker broker, Map<String, Object> objectAttributes) + { + Map<String, Object> attributes = retrieveAttributes(objectAttributes); + + final Port port; + Map<String, Object> defaults = new HashMap<String, Object>(); + defaults.put(Port.TRANSPORTS, Collections.singleton(DEFAULT_TRANSPORT)); + Object portValue = attributes.get(Port.PORT); + if (portValue == null) + { + throw new IllegalConfigurationException("Port attribute is not specified for port: " + attributes); + } + if (isAmqpProtocol(attributes)) + { + Object binding = attributes.get(Port.BINDING_ADDRESS); + if (binding == null) + { + binding = DEFAULT_AMQP_BINDING; + defaults.put(Port.BINDING_ADDRESS, DEFAULT_AMQP_BINDING); + } + defaults.put(Port.NAME, binding + ":" + portValue); + defaults.put(Port.PROTOCOLS, _defaultProtocols); + defaults.put(Port.TCP_NO_DELAY, DEFAULT_AMQP_TCP_NO_DELAY); + defaults.put(Port.WANT_CLIENT_AUTH, DEFAULT_AMQP_WANT_CLIENT_AUTH); + defaults.put(Port.NEED_CLIENT_AUTH, DEFAULT_AMQP_NEED_CLIENT_AUTH); + defaults.put(Port.RECEIVE_BUFFER_SIZE, DEFAULT_AMQP_RECEIVE_BUFFER_SIZE); + defaults.put(Port.SEND_BUFFER_SIZE, DEFAULT_AMQP_SEND_BUFFER_SIZE); + port = new AmqpPortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor()); + } + else + { + @SuppressWarnings("unchecked") + Collection<Protocol> protocols = (Collection<Protocol>)attributes.get(Port.PROTOCOLS); + if (protocols.size() > 1) + { + throw new IllegalConfigurationException("Only one protocol can be used on non AMQP port"); + } + Protocol protocol = protocols.iterator().next(); + defaults.put(Port.NAME, portValue + "-" + protocol.name()); + port = new PortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor()); + } + return port; + } + + private Map<String, Object> retrieveAttributes(Map<String, Object> objectAttributes) + { + Map<String, Object> attributes = new HashMap<String, Object>(objectAttributes); + + if (objectAttributes.containsKey(Port.PROTOCOLS)) + { + final Set<Protocol> protocolSet = MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, objectAttributes, Protocol.class); + attributes.put(Port.PROTOCOLS, protocolSet); + } + + if (objectAttributes.containsKey(Port.TRANSPORTS)) + { + final Set<Transport> transportSet = MapValueConverter.getEnumSetAttribute(Port.TRANSPORTS, objectAttributes, + Transport.class); + attributes.put(Port.TRANSPORTS, transportSet); + } + + if (objectAttributes.containsKey(Port.PORT)) + { + Integer port = MapValueConverter.getIntegerAttribute(Port.PORT, objectAttributes); + attributes.put(Port.PORT, port); + } + + if (objectAttributes.containsKey(Port.TCP_NO_DELAY)) + { + boolean tcpNoDelay = MapValueConverter.getBooleanAttribute(Port.TCP_NO_DELAY, objectAttributes); + attributes.put(Port.TCP_NO_DELAY, tcpNoDelay); + } + + if (objectAttributes.containsKey(Port.RECEIVE_BUFFER_SIZE)) + { + int receiveBufferSize = MapValueConverter.getIntegerAttribute(Port.RECEIVE_BUFFER_SIZE, objectAttributes); + attributes.put(Port.RECEIVE_BUFFER_SIZE, receiveBufferSize); + } + + if (objectAttributes.containsKey(Port.SEND_BUFFER_SIZE)) + { + int sendBufferSize = MapValueConverter.getIntegerAttribute(Port.SEND_BUFFER_SIZE, objectAttributes); + attributes.put(Port.SEND_BUFFER_SIZE, sendBufferSize); + } + + if (objectAttributes.containsKey(Port.NEED_CLIENT_AUTH)) + { + boolean needClientAuth = MapValueConverter.getBooleanAttribute(Port.NEED_CLIENT_AUTH, objectAttributes); + attributes.put(Port.NEED_CLIENT_AUTH, needClientAuth); + } + + if (objectAttributes.containsKey(Port.WANT_CLIENT_AUTH)) + { + boolean wantClientAuth = MapValueConverter.getBooleanAttribute(Port.WANT_CLIENT_AUTH, objectAttributes); + attributes.put(Port.WANT_CLIENT_AUTH, wantClientAuth); + } + + if (objectAttributes.containsKey(Port.BINDING_ADDRESS)) + { + String binding = MapValueConverter.getStringAttribute(Port.BINDING_ADDRESS, objectAttributes); + attributes.put(Port.BINDING_ADDRESS, binding); + } + + if (objectAttributes.containsKey(Port.STATE)) + { + State state = MapValueConverter.getEnumAttribute(State.class, Port.STATE, objectAttributes); + attributes.put(Port.STATE, state); + } + return attributes; + } + + private boolean isAmqpProtocol(Map<String, Object> portAttributes) + { + @SuppressWarnings("unchecked") + Set<Protocol> protocols = (Set<Protocol>) portAttributes.get(Port.PROTOCOLS); + if (protocols == null || protocols.isEmpty()) + { + // defaulting to AMQP if protocol is not specified + return true; + } + + Set<ProtocolType> protocolTypes = new HashSet<ProtocolType>(); + for (Protocol protocolObject : protocols) + { + protocolTypes.add(protocolObject.getProtocolType()); + } + + if (protocolTypes.size() > 1) + { + throw new IllegalConfigurationException("Found different protocol types '" + protocolTypes + + "' on port configuration: " + portAttributes); + } + + return protocolTypes.contains(ProtocolType.AMQP); + } + + public Collection<Protocol> getDefaultProtocols() + { + return _defaultProtocols; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 78f6d38d93..f3ddf32e5a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.MapValueConverter; final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener { @@ -50,15 +51,16 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>(); static { - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, "x-qpid-minimum-alert-repeat-gap"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, "x-qpid-maximum-message-age"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, "x-qpid-maximum-message-size"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, "x-qpid-maximum-message-count"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, "x-qpid-maximum-delivery-count"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY); QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY); QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY); @@ -78,7 +80,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue) { - super(queue.getId()); + super(queue.getId(), virtualHostAdapter.getTaskExecutor()); _vhost = virtualHostAdapter; addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter); @@ -205,47 +207,47 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } @Override - public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException + public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { try { if(ALERT_REPEAT_GAP.equals(name)) { _queue.setMinimumAlertRepeatGap((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name)) { _queue.setMaximumMessageAge((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name)) { _queue.setMaximumMessageSize((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name)) { _queue.setMaximumQueueDepth((Long)desired); - return desired; + return true; } else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name)) { _queue.setMaximumMessageCount((Long)desired); - return desired; + return true; } else if(ALTERNATE_EXCHANGE.equals(name)) { // In future we may want to accept a UUID as an alternative way to identifying the exchange ExchangeAdapter alternateExchange = (ExchangeAdapter) desired; _queue.setAlternateExchange(alternateExchange == null ? null : alternateExchange.getExchange()); - return desired; + return true; } else if(EXCLUSIVE.equals(name)) { Boolean exclusiveFlag = (Boolean) desired; _queue.setExclusive(exclusiveFlag); - return desired; + return true; } else if(MESSAGE_GROUP_KEY.equals(name)) { @@ -266,7 +268,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name)) { _queue.setMaximumDeliveryCount((Integer)desired); - return desired; + return true; } else if(NO_LOCAL.equals(name)) { @@ -279,12 +281,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name)) { _queue.setCapacity((Long)desired); - return desired; + return true; } else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name)) { _queue.setFlowResumeCapacity((Long)desired); - return desired; + return true; } else if(QUEUE_FLOW_STOPPED.equals(name)) { @@ -301,10 +303,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs else if (DESCRIPTION.equals(name)) { _queue.setDescription((String) desired); - return desired; + return true; } - return super.setAttribute(name, expected, desired); + return super.changeAttribute(name, expected, desired); } finally { @@ -495,8 +497,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs throws AccessControlException, IllegalStateException { attributes = new HashMap<String, Object>(attributes); - String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, ""); - Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP); + String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, ""); + Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap()); attributes.remove(org.apache.qpid.server.model.Binding.NAME); attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS); @@ -508,7 +510,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == org.apache.qpid.server.model.Binding.class) { @@ -712,15 +714,15 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } @Override - public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException, AccessControlException { if (desiredState == State.DELETED) { delete(); - return State.DELETED; + return true; } - return super.setDesiredState(currentState, desiredState); + return false; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index d802697d67..2fffdb32f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; final class SessionAdapter extends AbstractAdapter implements Session @@ -44,9 +45,9 @@ final class SessionAdapter extends AbstractAdapter implements Session private AMQSessionModel _session; private SessionStatistics _statistics; - public SessionAdapter(final AMQSessionModel session) + public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateRandomUUID()); + super(UUIDGenerator.generateRandomUUID(), taskExecutor); _session = session; _statistics = new SessionStatistics(); } @@ -141,13 +142,6 @@ final class SessionAdapter extends AbstractAdapter implements Session return super.getAttribute(name); //TODO - Implement } - @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement - } - public Statistics getStatistics() { return _statistics; @@ -237,4 +231,11 @@ final class SessionAdapter extends AbstractAdapter implements Session return null; // TODO - Implement } } + + @Override + protected boolean setState(State currentState, State desiredState) + { + // TODO : add state management + return false; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/TrustStoreAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/TrustStoreAdapter.java new file mode 100644 index 0000000000..bdffe605ec --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/TrustStoreAdapter.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model.adapter; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.TrustStore; + +public class TrustStoreAdapter extends AbstractKeyStoreAdapter implements TrustStore +{ + public TrustStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes) + { + super(id, broker, attributes); + } + + @Override + public Collection<String> getAttributeNames() + { + return AVAILABLE_ATTRIBUTES; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 35838e51d2..1d50be279f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.model.adapter; +import java.io.File; import java.security.AccessControlException; import java.security.Principal; import java.util.ArrayList; @@ -31,17 +32,27 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.configuration.SystemConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueType; import org.apache.qpid.server.model.State; @@ -49,22 +60,38 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener, +public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener, QueueRegistry.RegistryChangeListener, IConnectionRegistry.RegistryChangeListener { - private final org.apache.qpid.server.virtualhost.VirtualHost _virtualHost; + @SuppressWarnings("serial") + public static final Map<String, Class<?>> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Class<?>>(){{ + put(NAME, String.class); + put(STORE_PATH, String.class); + put(STORE_TYPE, String.class); + put(CONFIG_PATH, String.class); + put(STATE, State.class); + }}); + + private org.apache.qpid.server.virtualhost.VirtualHost _virtualHost; private final Map<AMQConnectionModel, ConnectionAdapter> _connectionAdapters = new HashMap<AMQConnectionModel, ConnectionAdapter>(); @@ -74,37 +101,52 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E private final Map<org.apache.qpid.server.exchange.Exchange, ExchangeAdapter> _exchangeAdapters = new HashMap<org.apache.qpid.server.exchange.Exchange, ExchangeAdapter>(); - - private final StatisticsAdapter _statistics; - - private final BrokerAdapter _broker; - + private StatisticsAdapter _statistics; + private final Broker _broker; private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>(); + private StatisticsGatherer _brokerStatisticsGatherer; - - VirtualHostAdapter(BrokerAdapter brokerAdapter, - final org.apache.qpid.server.virtualhost.VirtualHost virtualHost) + public VirtualHostAdapter(UUID id, Map<String, Object> attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer, TaskExecutor taskExecutor) { - super(virtualHost.getId()); - _broker = brokerAdapter; - _virtualHost = virtualHost; - _statistics = new VirtualHostStatisticsAdapter(virtualHost); - virtualHost.getQueueRegistry().addRegistryChangeListener(this); - populateQueues(); - virtualHost.getExchangeRegistry().addRegistryChangeListener(this); - populateExchanges(); - virtualHost.getConnectionRegistry().addRegistryChangeListener(this); - populateConnections(); - + super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor); + validateAttributes(); + _broker = broker; + _brokerStatisticsGatherer = brokerStatisticsGatherer; + addParent(Broker.class, broker); + } + private void validateAttributes() + { + String name = getName(); + if (name == null || "".equals(name.trim())) + { + throw new IllegalConfigurationException("Virtual host name must be specified"); + } - for(Port port :_broker.getPorts()) + String configurationFile = (String) getAttribute(CONFIG_PATH); + String storePath = (String) getAttribute(STORE_PATH); + String storeType = (String) getAttribute(STORE_TYPE); + boolean invalidAttributes = false; + if (configurationFile == null) + { + if (storePath == null || storeType == null) + { + invalidAttributes = true; + } + } + else + { + if (storePath != null || storeType != null) + { + invalidAttributes = true; + } + } + if (invalidAttributes) { - _aliases.add(new VirtualHostAliasAdapter(this, port)); + throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or both 'storePath' and 'storeType' attributes"); } } - private void populateExchanges() { Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges = @@ -126,37 +168,22 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E private void populateQueues() { Collection<AMQQueue> actualQueues = _virtualHost.getQueueRegistry().getQueues(); - - synchronized(_queueAdapters) - { - for(AMQQueue queue : actualQueues) - { - if(!_queueAdapters.containsKey(queue)) - { - _queueAdapters.put(queue, new QueueAdapter(this,queue)); - } - } - } - } - - private void populateConnections() - { - - List<AMQConnectionModel> actualConnections = _virtualHost.getConnectionRegistry().getConnections(); - - synchronized(_connectionAdapters) + if ( actualQueues != null ) { - for(AMQConnectionModel conn : actualConnections) + synchronized(_queueAdapters) { - if(!_connectionAdapters.containsKey(conn)) + for(AMQQueue queue : actualQueues) { - _connectionAdapters.put(conn, new ConnectionAdapter(conn)); + if(!_queueAdapters.containsKey(queue)) + { + _queueAdapters.put(queue, new QueueAdapter(this, queue)); + } } } } - } + @Override public String getReplicationGroupName() { return null; //TODO @@ -198,12 +225,12 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E { attributes = new HashMap<String, Object>(attributes); - String name = getStringAttribute(Exchange.NAME, attributes, null); - State state = getEnumAttribute(State.class, Exchange.STATE, attributes, State.ACTIVE); - boolean durable = getBooleanAttribute(Exchange.DURABLE, attributes, false); - LifetimePolicy lifetime = getEnumAttribute(LifetimePolicy.class, Exchange.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT); - String type = getStringAttribute(Exchange.TYPE, attributes, null); - long ttl = getLongAttribute(Exchange.TIME_TO_LIVE, attributes, 0l); + String name = MapValueConverter.getStringAttribute(Exchange.NAME, attributes, null); + State state = MapValueConverter.getEnumAttribute(State.class, Exchange.STATE, attributes, State.ACTIVE); + boolean durable = MapValueConverter.getBooleanAttribute(Exchange.DURABLE, attributes, false); + LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Exchange.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT); + String type = MapValueConverter.getStringAttribute(Exchange.TYPE, attributes, null); + long ttl = MapValueConverter.getLongAttribute(Exchange.TIME_TO_LIVE, attributes, 0l); attributes.remove(Exchange.NAME); attributes.remove(Exchange.STATE); @@ -266,7 +293,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E if (attributes.containsKey(Queue.TYPE)) { - String typeAttribute = getStringAttribute(Queue.TYPE, attributes, null); + String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null); QueueType queueType = null; try { @@ -289,12 +316,12 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E throw new IllegalArgumentException("Sort key is not specified for sorted queue"); } } - String name = getStringAttribute(Queue.NAME, attributes, null); - State state = getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE); - boolean durable = getBooleanAttribute(Queue.DURABLE, attributes, false); - LifetimePolicy lifetime = getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT); - long ttl = getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l); - boolean exclusive= getBooleanAttribute(Queue.EXCLUSIVE, attributes, false); + String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null); + State state = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE); + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); + LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT); + long ttl = MapValueConverter.getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l); + boolean exclusive= MapValueConverter.getBooleanAttribute(Queue.EXCLUSIVE, attributes, false); attributes.remove(Queue.NAME); attributes.remove(Queue.STATE); @@ -328,11 +355,10 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E String owner = null; if(exclusive) { - Set<Principal> principals = - SecurityManager.getThreadSubject().getPrincipals(); - if(principals != null && !principals.isEmpty()) + Principal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(SecurityManager.getThreadSubject()); + if(authenticatedPrincipal != null) { - owner = principals.iterator().next().getName(); + owner = authenticatedPrincipal.getName(); } } try @@ -370,7 +396,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E public String getName() { - return _virtualHost.getName(); + return (String)getAttribute(NAME); } public String setName(final String currentName, final String desiredName) @@ -379,9 +405,36 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E throw new IllegalStateException(); } + @Override public State getActualState() { - return getDesiredState(); + if (_virtualHost == null) + { + State state = (State)super.getAttribute(STATE); + if (state == null) + { + return State.INITIALISING; + } + return state; + } + else + { + org.apache.qpid.server.virtualhost.State implementationState = _virtualHost.getState(); + switch(implementationState) + { + case INITIALISING: + return State.INITIALISING; + case ACTIVE: + return State.ACTIVE; + case PASSIVE: + return State.QUIESCED; + case STOPPED: + return State.STOPPED; + default: + // unexpected state + return null; + } + } } public boolean isDurable() @@ -448,7 +501,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == Exchange.class) { @@ -548,7 +601,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E { if(!_connectionAdapters.containsKey(connection)) { - adapter = new ConnectionAdapter(connection); + adapter = new ConnectionAdapter(connection, getTaskExecutor()); _connectionAdapters.put(connection, adapter); } @@ -709,13 +762,9 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E { return getId(); } - else if(NAME.equals(name)) - { - return getName(); - } else if(STATE.equals(name)) { - return State.ACTIVE; + return getActualState(); } else if(DURABLE.equals(name)) { @@ -737,10 +786,19 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E { // TODO } - else if(SUPPORTED_EXCHANGE_TYPES.equals(name)) + else if (_virtualHost != null) + { + return getAttributeFromVirtualHostImplementation(name); + } + return super.getAttribute(name); + } + + private Object getAttributeFromVirtualHostImplementation(String name) + { + if(SUPPORTED_EXCHANGE_TYPES.equals(name)) { List<String> types = new ArrayList<String>(); - for(ExchangeType type : _virtualHost.getExchangeFactory().getRegisteredTypes()) + for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeFactory().getRegisteredTypes()) { types.add(type.getName().asString()); } @@ -754,10 +812,6 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E { return _virtualHost.getConfiguration().isDeadLetterQueueEnabled(); } - else if(FEDERATION_TAG.equals(name)) - { - return _virtualHost.getFederationTag(); - } else if(HOUSEKEEPING_CHECK_PERIOD.equals(name)) { return _virtualHost.getConfiguration().getHousekeepingCheckPeriod(); @@ -778,9 +832,9 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E { return _virtualHost.getMessageStore().getStoreType(); } - else if(STORE_CONFIGURATION.equals(name)) + else if(STORE_PATH.equals(name)) { - // TODO + return _virtualHost.getMessageStore().getStoreLocation(); } else if(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE.equals(name)) { @@ -822,13 +876,6 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement - } - - @Override public Collection<String> getAttributeNames() { return AVAILABLE_ATTRIBUTES; @@ -889,4 +936,111 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E } } + + @Override + protected boolean setState(State currentState, State desiredState) + { + if (desiredState == State.ACTIVE) + { + activate(); + return true; + } + else if (desiredState == State.STOPPED) + { + if (_virtualHost != null) + { + try + { + _virtualHost.close(); + } + finally + { + _broker.getVirtualHostRegistry().unregisterVirtualHost(_virtualHost); + } + } + return true; + } + else if (desiredState == State.DELETED) + { + //TODO: add ACL check to authorize the operation + if (_virtualHost != null && _virtualHost.getState() == org.apache.qpid.server.virtualhost.State.ACTIVE) + { + setDesiredState(currentState, State.STOPPED); + } + return true; + } + return false; + } + + private void activate() + { + VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry(); + String virtualHostName = getName(); + try + { + VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName); + _virtualHost = new VirtualHostImpl(_broker.getVirtualHostRegistry(), _brokerStatisticsGatherer, _broker.getSecurityManager(), configuration); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create virtual host " + virtualHostName, e); + } + + virtualHostRegistry.registerVirtualHost(_virtualHost); + + _statistics = new VirtualHostStatisticsAdapter(_virtualHost); + _virtualHost.getQueueRegistry().addRegistryChangeListener(this); + populateQueues(); + _virtualHost.getExchangeRegistry().addRegistryChangeListener(this); + populateExchanges(); + _virtualHost.getConnectionRegistry().addRegistryChangeListener(this); + + synchronized(_aliases) + { + for(Port port :_broker.getPorts()) + { + if (Protocol.hasAmqpProtocol(port.getProtocols())) + { + _aliases.add(new VirtualHostAliasAdapter(this, port)); + } + } + } + } + + private VirtualHostConfiguration createVirtualHostConfiguration(String virtualHostName) throws ConfigurationException + { + VirtualHostConfiguration configuration; + String configurationFile = (String)getAttribute(CONFIG_PATH); + if (configurationFile == null) + { + final MyConfiguration basicConfiguration = new MyConfiguration(); + PropertiesConfiguration config = new PropertiesConfiguration(); + config.addProperty("store.type", (String)getAttribute(STORE_TYPE)); + config.addProperty("store.environment-path", (String)getAttribute(STORE_PATH)); + basicConfiguration.addConfiguration(config); + + CompositeConfiguration compositeConfiguration = new CompositeConfiguration(); + compositeConfiguration.addConfiguration(new SystemConfiguration()); + compositeConfiguration.addConfiguration(basicConfiguration); + configuration = new VirtualHostConfiguration(virtualHostName, compositeConfiguration , _broker); + } + else + { + configuration = new VirtualHostConfiguration(virtualHostName, new File(configurationFile) , _broker); + } + return configuration; + } + + @Override + public SecurityManager getSecurityManager() + { + return _virtualHost.getSecurityManager(); + } + + @Override + public MessageStore getMessageStore() + { + return _virtualHost.getMessageStore(); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java index 367d1ff518..91b705b004 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java @@ -43,7 +43,7 @@ public class VirtualHostAliasAdapter extends AbstractAdapter implements Virtual public VirtualHostAliasAdapter(VirtualHostAdapter virtualHostAdapter, Port port) { - super(UUIDGenerator.generateVhostAliasUUID(virtualHostAdapter.getName(), port.getName())); + super(UUIDGenerator.generateVhostAliasUUID(virtualHostAdapter.getName(), port.getName()), virtualHostAdapter.getTaskExecutor()); _vhost = virtualHostAdapter; _port = port; } @@ -140,4 +140,11 @@ public class VirtualHostAliasAdapter extends AbstractAdapter implements Virtual { throw new UnsupportedOperationException(); } + + @Override + protected boolean setState(State currentState, State desiredState) + { + // TODO: state is not supported at the moment + return false; + } } |