summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/model
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/model')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Broker.java124
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Group.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/GroupMember.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/GroupProvider.java55
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/KeyStore.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Model.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Plugin.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Port.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java97
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Transport.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/TrustStore.java65
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/User.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java271
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java198
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java152
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java251
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java199
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderFactory.java77
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java807
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java63
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java550
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/HTTPPortAdapter.java273
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java169
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java222
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/TrustStoreAdapter.java43
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java332
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java9
38 files changed, 3495 insertions, 980 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java b/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java
index 6000886956..417f6036ab 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java
@@ -24,6 +24,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.group.GroupPrincipalAccessor;
+
public interface AuthenticationProvider extends ConfiguredObject
{
@@ -52,4 +55,15 @@ public interface AuthenticationProvider extends ConfiguredObject
TYPE));
//children
Collection<VirtualHostAlias> getVirtualHostPortBindings();
+
+ String getName();
+
+ /**
+ * A temporary method to create SubjectCreator.
+ *
+ * TODO: move all the functionality from SubjectCreator into AuthenticationProvider
+ */
+ SubjectCreator getSubjectCreator();
+
+ void setGroupAccessor(GroupPrincipalAccessor groupPrincipalAccessor);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
index 08b01a1b65..fbecf1965b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -20,12 +20,20 @@
*/
package org.apache.qpid.server.model;
+import java.net.SocketAddress;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import org.apache.qpid.server.logging.LogRecorder;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
public interface Broker extends ConfiguredObject
{
@@ -44,9 +52,49 @@ public interface Broker extends ConfiguredObject
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
+ String DEFAULT_AUTHENTICATION_PROVIDER = "defaultAuthenticationProvider";
+ String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost";
+
+ String ALERT_THRESHOLD_MESSAGE_AGE = "alertThresholdMessageAge";
+ String ALERT_THRESHOLD_MESSAGE_COUNT = "alertThresholdMessageCount";
+ String ALERT_THRESHOLD_QUEUE_DEPTH = "alertThresholdQueueDepth";
+ String ALERT_THRESHOLD_MESSAGE_SIZE = "alertThresholdMessageSize";
+ String ALERT_REPEAT_GAP = "alertRepeatGap";
+ String FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
+ String FLOW_CONTROL_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
+ String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
+ String DEAD_LETTER_QUEUE_ENABLED = "deadLetterQueueEnabled";
+ String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod";
+
+ String SESSION_COUNT_LIMIT = "sessionCountLimit";
+ String HEART_BEAT_DELAY = "heartBeatDelay";
+ String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod";
+ String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled";
+
+ /*
+ * A temporary attribute to pass the path to ACL file.
+ * TODO: It should be a part of AuthorizationProvider.
+ */
+ String ACL_FILE = "aclFile";
+
+ /*
+ * A temporary attributes to set the broker default key/trust stores.
+ * TODO: Remove them after adding a full support to configure KeyStore/TrustStore via management layers.
+ */
+ String KEY_STORE_PATH = "keyStorePath";
+ String KEY_STORE_PASSWORD = "keyStorePassword";
+ String KEY_STORE_CERT_ALIAS = "keyStoreCertAlias";
+ String TRUST_STORE_PATH = "trustStorePath";
+ String TRUST_STORE_PASSWORD = "trustStorePassword";
+
+ /*
+ * A temporary attributes to set the broker group file.
+ * TODO: Remove them after adding a full support to configure authorization providers via management layers.
+ */
+ String GROUP_FILE = "groupFile";
// Attributes
- public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collection<String> AVAILABLE_ATTRIBUTES =
Collections.unmodifiableList(
Arrays.asList(BUILD_VERSION,
BYTES_RETAINED,
@@ -62,7 +110,32 @@ public interface Broker extends ConfiguredObject
NAME,
STATE,
TIME_TO_LIVE,
- UPDATED));
+ UPDATED,
+ DEFAULT_AUTHENTICATION_PROVIDER,
+ DEFAULT_VIRTUAL_HOST,
+ ALERT_THRESHOLD_MESSAGE_AGE,
+ ALERT_THRESHOLD_MESSAGE_COUNT,
+ ALERT_THRESHOLD_QUEUE_DEPTH,
+ ALERT_THRESHOLD_MESSAGE_SIZE,
+ ALERT_REPEAT_GAP,
+ FLOW_CONTROL_SIZE_BYTES,
+ FLOW_CONTROL_RESUME_SIZE_BYTES,
+ MAXIMUM_DELIVERY_ATTEMPTS,
+ DEAD_LETTER_QUEUE_ENABLED,
+ HOUSEKEEPING_CHECK_PERIOD,
+ SESSION_COUNT_LIMIT,
+ HEART_BEAT_DELAY,
+ STATISTICS_REPORTING_PERIOD,
+ STATISTICS_REPORTING_RESET_ENABLED,
+
+ ACL_FILE,
+ KEY_STORE_PATH,
+ KEY_STORE_PASSWORD,
+ KEY_STORE_CERT_ALIAS,
+ TRUST_STORE_PATH,
+ TRUST_STORE_PASSWORD,
+ GROUP_FILE
+ ));
//children
Collection < VirtualHost > getVirtualHosts();
@@ -75,6 +148,49 @@ public interface Broker extends ConfiguredObject
LifetimePolicy lifetime, long ttl, Map<String, Object> attributes)
throws AccessControlException, IllegalArgumentException;
- void deleteVirtualHost(VirtualHost virtualHost)
- throws AccessControlException, IllegalStateException;
+ AuthenticationProvider getDefaultAuthenticationProvider();
+
+ Collection<GroupProvider> getGroupProviders();
+
+ /**
+ * A temporary hack to expose root message logger via broker instance.
+ * TODO We need a better way to do operational logging, for example, via logging listeners
+ */
+ RootMessageLogger getRootMessageLogger();
+
+ /**
+ * A temporary hack to expose security manager via broker instance.
+ * TODO We need to add and implement an authorization provider configured object instead
+ */
+ SecurityManager getSecurityManager();
+
+ /**
+ * TODO: A temporary hack to expose log recorder via broker instance.
+ */
+ LogRecorder getLogRecorder();
+
+ VirtualHost findVirtualHostByName(String name);
+
+ /**
+ * Get the SubjectCreator for the given socket address.
+ * TODO: move the authentication related functionality into host aliases and AuthenticationProviders
+ *
+ * @param address The (listening) socket address for which the AuthenticationManager is required
+ */
+ SubjectCreator getSubjectCreator(SocketAddress localAddress);
+
+ Collection<KeyStore> getKeyStores();
+
+ Collection<TrustStore> getTrustStores();
+
+ /*
+ * TODO: Remove this method. Eventually the broker will become a registry.
+ */
+ VirtualHostRegistry getVirtualHostRegistry();
+
+ KeyStore getDefaultKeyStore();
+
+ TrustStore getDefaultTrustStore();
+
+ TaskExecutor getTaskExecutor();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
index 78b98faffe..bd7da962ba 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
@@ -36,4 +36,5 @@ public interface ConfigurationChangeListener
void childRemoved(ConfiguredObject object, ConfiguredObject child);
+ void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 414b2d083a..d567a3aa44 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -25,6 +25,9 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+/**
+ * An object that can be "managed" (eg via the web interface) and usually read from configuration.
+ */
public interface ConfiguredObject
{
@@ -47,7 +50,7 @@ public interface ConfiguredObject
* Attempt to change the name of the object
*
* Request a change to the name of the object. The caller must pass in the name it believes the object currently
- * has. If the current name differes from this expected value, then no name change will occur
+ * has. If the current name differs from this expected value, then no name change will occur
*
* @param currentName the name the caller believes the object to have
* @param desiredName the name the caller would like the object to have
@@ -198,14 +201,25 @@ public interface ConfiguredObject
/**
- * Return the value for the given attribute
+ * Return the value for the given attribute name. The actual attribute value
+ * is returned if the configured object has such attribute set. If not, the
+ * value is looked default attributes.
*
- * @param name the name of the attribute
- * @return the value of the attribute at the object (or null if the attribute is not set
+ * @param name
+ * the name of the attribute
+ * @return the value of the attribute at the object (or null if the
+ * attribute value is set neither on object itself no in defaults)
*/
Object getAttribute(String name);
/**
+ * Return the map containing only explicitly set attributes
+ *
+ * @return the map with the attributes
+ */
+ Map<String, Object> getActualAttributes();
+
+ /**
* Set the value of an attribute
*
* @param name the name of the attribute to be set
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Group.java b/java/broker/src/main/java/org/apache/qpid/server/model/Group.java
new file mode 100644
index 0000000000..aacd515107
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Group.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public interface Group extends ConfiguredObject
+{
+ String CREATED = "created";
+ String DURABLE = "durable";
+ String ID = "id";
+ String LIFETIME_POLICY = "lifetimePolicy";
+ String NAME = "name";
+ String STATE = "state";
+ String TIME_TO_LIVE = "timeToLive";
+ String UPDATED = "updated";
+
+ // Attributes
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED
+ ));
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/GroupMember.java b/java/broker/src/main/java/org/apache/qpid/server/model/GroupMember.java
new file mode 100644
index 0000000000..6832cc6fa6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/GroupMember.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public interface GroupMember extends ConfiguredObject
+{
+ String CREATED = "created";
+ String DURABLE = "durable";
+ String ID = "id";
+ String LIFETIME_POLICY = "lifetimePolicy";
+ String NAME = "name";
+ String STATE = "state";
+ String TIME_TO_LIVE = "timeToLive";
+ String UPDATED = "updated";
+
+ // Attributes
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED
+ ));
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/GroupProvider.java b/java/broker/src/main/java/org/apache/qpid/server/model/GroupProvider.java
new file mode 100644
index 0000000000..9016f97927
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/GroupProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+public interface GroupProvider extends ConfiguredObject
+{
+ public static final String ID = "id";
+ public static final String DESCRIPTION = "description";
+ public static final String NAME = "name";
+ public static final String STATE = "state";
+ public static final String DURABLE = "durable";
+ public static final String LIFETIME_POLICY = "lifetimePolicy";
+ public static final String TIME_TO_LIVE = "timeToLive";
+ public static final String CREATED = "created";
+ public static final String UPDATED = "updated";
+ public static final String TYPE = "type";
+
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(ID,
+ NAME,
+ DESCRIPTION,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED,
+ TYPE));
+
+ Set<Principal> getGroupPrincipalsForUser(String username);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/KeyStore.java b/java/broker/src/main/java/org/apache/qpid/server/model/KeyStore.java
new file mode 100644
index 0000000000..959714656b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/KeyStore.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public interface KeyStore extends TrustStore
+{
+
+ String CERTIFICATE_ALIAS = "certificateAlias";
+
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED,
+ DESCRIPTION,
+ PATH,
+ PASSWORD,
+ TYPE,
+ KEY_MANAGER_FACTORY_ALGORITHM,
+ CERTIFICATE_ALIAS
+ ));
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
index 36179fc105..2c05dce9cb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Model.java
@@ -47,6 +47,10 @@ public class Model
addRelationship(Broker.class, VirtualHost.class);
addRelationship(Broker.class, Port.class);
addRelationship(Broker.class, AuthenticationProvider.class);
+ addRelationship(Broker.class, GroupProvider.class);
+ addRelationship(Broker.class, TrustStore.class);
+ addRelationship(Broker.class, KeyStore.class);
+ addRelationship(Broker.class, Plugin.class);
addRelationship(VirtualHost.class, Exchange.class);
addRelationship(VirtualHost.class, Queue.class);
@@ -54,6 +58,10 @@ public class Model
addRelationship(VirtualHost.class, VirtualHostAlias.class);
addRelationship(AuthenticationProvider.class, User.class);
+ addRelationship(User.class, GroupMember.class);
+
+ addRelationship(GroupProvider.class, Group.class);
+ addRelationship(Group.class, GroupMember.class);
addRelationship(Connection.class, Session.class);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Plugin.java b/java/broker/src/main/java/org/apache/qpid/server/model/Plugin.java
new file mode 100644
index 0000000000..b9503a5841
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Plugin.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public interface Plugin extends ConfiguredObject
+{
+ //Hack, using it for the class name only for consistency with the other things.
+ String CREATED = "created";
+ String DURABLE = "durable";
+ String ID = "id";
+ String LIFETIME_POLICY = "lifetimePolicy";
+ String NAME = "name";
+ String STATE = "state";
+ String TIME_TO_LIVE = "timeToLive";
+ String UPDATED = "updated";
+
+ // Attributes
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED
+ ));
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Port.java b/java/broker/src/main/java/org/apache/qpid/server/model/Port.java
index 50c0ebcd14..2f94c3cab7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Port.java
@@ -39,6 +39,16 @@ public interface Port extends ConfiguredObject
String PORT = "port";
String PROTOCOLS = "protocols";
String TRANSPORTS = "transports";
+ String TCP_NO_DELAY = "tcpNoDelay";
+ String SEND_BUFFER_SIZE = "sendBufferSize";
+ String RECEIVE_BUFFER_SIZE = "receiveBufferSize";
+ String NEED_CLIENT_AUTH = "needClientAuth";
+ String WANT_CLIENT_AUTH = "wantClientAuth";
+
+ /**
+ * TODO: rename it to AUTHENTICATION_MANAGER_ID or introduce relationships
+ */
+ String AUTHENTICATION_MANAGER = "authenticationManager";
// Attributes
public static final Collection<String> AVAILABLE_ATTRIBUTES =
@@ -55,7 +65,13 @@ public interface Port extends ConfiguredObject
BINDING_ADDRESS,
PORT,
PROTOCOLS,
- TRANSPORTS
+ TRANSPORTS,
+ TCP_NO_DELAY,
+ SEND_BUFFER_SIZE,
+ RECEIVE_BUFFER_SIZE,
+ NEED_CLIENT_AUTH,
+ WANT_CLIENT_AUTH,
+ AUTHENTICATION_MANAGER
));
@@ -88,4 +104,8 @@ public interface Port extends ConfiguredObject
//children
Collection<VirtualHostAlias> getVirtualHostBindings();
Collection<Connection> getConnections();
+
+ AuthenticationProvider getAuthenticationProvider();
+
+ void setAuthenticationProvider(AuthenticationProvider authenticationProvider);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java b/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java
index 5d9de69f9a..6cd5eb23a4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Protocol.java
@@ -20,14 +20,95 @@
*/
package org.apache.qpid.server.model;
+import java.util.Collection;
+import java.util.EnumSet;
+
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+
public enum Protocol
{
- AMQP_0_8,
- AMQP_0_9,
- AMQP_0_9_1,
- AMQP_0_10,
- AMQP_1_0,
- JMX,
- HTTP,
- HTTPS
+ AMQP_0_8(ProtocolType.AMQP),
+ AMQP_0_9(ProtocolType.AMQP),
+ AMQP_0_9_1(ProtocolType.AMQP),
+ AMQP_0_10(ProtocolType.AMQP),
+ AMQP_1_0(ProtocolType.AMQP),
+ JMX_RMI(ProtocolType.JMX),
+ HTTP(ProtocolType.HTTP),
+ HTTPS(ProtocolType.HTTP),
+ RMI(ProtocolType.RMI);
+
+ private final ProtocolType _protocolType;
+
+ private Protocol(ProtocolType type)
+ {
+ _protocolType = type;
+ }
+
+ public ProtocolType getProtocolType()
+ {
+ return _protocolType;
+ }
+
+ public boolean isAMQP()
+ {
+ return _protocolType == ProtocolType.AMQP;
+ }
+
+ public AmqpProtocolVersion toAmqpProtocolVersion()
+ {
+ switch(this)
+ {
+ case AMQP_0_8:
+ return AmqpProtocolVersion.v0_8;
+ case AMQP_0_9:
+ return AmqpProtocolVersion.v0_9;
+ case AMQP_0_9_1:
+ return AmqpProtocolVersion.v0_9_1;
+ case AMQP_0_10:
+ return AmqpProtocolVersion.v0_10;
+ case AMQP_1_0:
+ return AmqpProtocolVersion.v1_0_0;
+ default:
+ throw new IllegalArgumentException(this + " is not an known AMQP protocol");
+ }
+ }
+
+ public static Protocol valueOfObject(Object protocolObject)
+ {
+ Protocol protocol;
+ if (protocolObject instanceof Protocol)
+ {
+ protocol = (Protocol) protocolObject;
+ }
+ else
+ {
+ try
+ {
+ protocol = Protocol.valueOf(String.valueOf(protocolObject));
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Can't convert '" + protocolObject
+ + "' to one of the supported protocols: " + EnumSet.allOf(Protocol.class), e);
+ }
+ }
+ return protocol;
+ }
+
+ public static boolean hasAmqpProtocol(Collection<Protocol> protocols)
+ {
+ for (Protocol protocol : protocols)
+ {
+ if (protocol.isAMQP())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static enum ProtocolType
+ {
+ AMQP, HTTP, JMX, RMI;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java b/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java
index 03cd46be01..ae6e5ac43a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Transport.java
@@ -20,8 +20,32 @@
*/
package org.apache.qpid.server.model;
+import java.util.EnumSet;
+
public enum Transport
{
TCP,
- SSL
+ SSL;
+
+ public static Transport valueOfObject(Object transportObject)
+ {
+ Transport transport;
+ if (transportObject instanceof Transport)
+ {
+ transport = (Transport) transportObject;
+ }
+ else
+ {
+ try
+ {
+ transport = Transport.valueOf(String.valueOf(transportObject));
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Can't convert '" + transportObject
+ + "' to one of the supported transports: " + EnumSet.allOf(Transport.class), e);
+ }
+ }
+ return transport;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/TrustStore.java b/java/broker/src/main/java/org/apache/qpid/server/model/TrustStore.java
new file mode 100644
index 0000000000..0c322ae02f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/TrustStore.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public interface TrustStore extends ConfiguredObject
+{
+ String ID = "id";
+ String NAME = "name";
+ String DURABLE = "durable";
+ String LIFETIME_POLICY = "lifetimePolicy";
+ String STATE = "state";
+ String TIME_TO_LIVE = "timeToLive";
+ String CREATED = "created";
+ String UPDATED = "updated";
+ String DESCRIPTION = "description";
+
+ String PATH = "path";
+ String PASSWORD = "password";
+ String TYPE = "type";
+ String KEY_MANAGER_FACTORY_ALGORITHM = "keyManagerFactoryAlgorithm";
+
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED,
+ DESCRIPTION,
+ PATH,
+ PASSWORD,
+ TYPE,
+ KEY_MANAGER_FACTORY_ALGORITHM
+ ));
+
+ public String getPassword();
+
+ public void setPassword(String password);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java b/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java
index 36b6a454dc..bcedd91596 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java
@@ -63,6 +63,11 @@ public class UUIDGenerator
return createUUID(User.class.getName(), authenticationProviderName, userName);
}
+ public static UUID generateGroupUUID(String groupProviderName, String groupName)
+ {
+ return createUUID(Group.class.getName(), groupProviderName, groupName);
+ }
+
public static UUID generateVhostUUID(String virtualHostName)
{
return createUUID(VirtualHost.class.getName(), virtualHostName);
@@ -77,4 +82,14 @@ public class UUIDGenerator
{
return createUUID(Consumer.class.getName(), virtualHostName, queueName, connectionRemoteAddress, channelNumber, consumerName);
}
+
+ public static UUID generateGroupMemberUUID(String groupProviderName, String groupName, String groupMemberName)
+ {
+ return createUUID(GroupMember.class.getName(), groupProviderName, groupName, groupMemberName);
+ }
+
+ public static UUID generateBrokerChildUUID(String type, String childName)
+ {
+ return createUUID(type, childName);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/User.java b/java/broker/src/main/java/org/apache/qpid/server/model/User.java
index d97bf46d31..675dc8f0d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/User.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/User.java
@@ -52,8 +52,6 @@ public interface User extends ConfiguredObject
PASSWORD
));
- public String getPassword();
-
public void setPassword(String password);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 24a3d43386..5f4ec1d3a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.model;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.MessageStore;
+
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.Collection;
@@ -60,17 +63,16 @@ public interface VirtualHost extends ConfiguredObject
String ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "alertThresholdQueueDepthBytes";
String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages";
String DEAD_LETTER_QUEUE_ENABLED = "deadLetterQueueEnabled";
- String FEDERATION_TAG = "federationTag";
String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod";
String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
- String STORE_CONFIGURATION = "storeConfiguration";
String STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "storeTransactionIdleTimeoutClose";
String STORE_TRANSACTION_IDLE_TIMEOUT_WARN = "storeTransactionIdleTimeoutWarn";
String STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = "storeTransactionOpenTimeoutClose";
String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn";
String STORE_TYPE = "storeType";
+ String STORE_PATH = "storePath";
String SUPPORTED_EXCHANGE_TYPES = "supportedExchangeTypes";
String SUPPORTED_QUEUE_TYPES = "supportedQueueTypes";
String CREATED = "created";
@@ -81,6 +83,8 @@ public interface VirtualHost extends ConfiguredObject
String STATE = "state";
String TIME_TO_LIVE = "timeToLive";
String UPDATED = "updated";
+ String CONFIG_PATH = "configPath";
+
// Attributes
public static final Collection<String> AVAILABLE_ATTRIBUTES =
Collections.unmodifiableList(
@@ -96,13 +100,12 @@ public interface VirtualHost extends ConfiguredObject
SUPPORTED_EXCHANGE_TYPES,
SUPPORTED_QUEUE_TYPES,
DEAD_LETTER_QUEUE_ENABLED,
- FEDERATION_TAG,
HOUSEKEEPING_CHECK_PERIOD,
MAXIMUM_DELIVERY_ATTEMPTS,
QUEUE_FLOW_CONTROL_SIZE_BYTES,
QUEUE_FLOW_RESUME_SIZE_BYTES,
STORE_TYPE,
- STORE_CONFIGURATION,
+ STORE_PATH,
STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE,
STORE_TRANSACTION_IDLE_TIMEOUT_WARN,
STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE,
@@ -111,7 +114,8 @@ public interface VirtualHost extends ConfiguredObject
ALERT_THRESHOLD_MESSAGE_AGE,
ALERT_THRESHOLD_MESSAGE_SIZE,
ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
- ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+ ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ CONFIG_PATH));
@@ -149,4 +153,12 @@ public interface VirtualHost extends ConfiguredObject
}
void executeTransaction(TransactionalOperation op);
+
+ /**
+ * A temporary hack to expose host security manager.
+ * TODO We need to add and implement an authorization provider configured object instead
+ */
+ SecurityManager getSecurityManager();
+
+ MessageStore getMessageStore();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
index 7d6aa9b2cb..73e1f1e970 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
@@ -24,12 +24,18 @@ import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.configuration.updater.ChangeStateTask;
+import org.apache.qpid.server.configuration.updater.CreateChildTask;
+import org.apache.qpid.server.configuration.updater.SetAttributeTask;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
abstract class AbstractAdapter implements ConfiguredObject
{
@@ -40,134 +46,78 @@ abstract class AbstractAdapter implements ConfiguredObject
new ArrayList<ConfigurationChangeListener>();
private final UUID _id;
+ private final Map<String, Object> _defaultAttributes = new HashMap<String, Object>();
+ private final TaskExecutor _taskExecutor;
- protected AbstractAdapter(UUID id)
+ protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor)
{
+ _taskExecutor = taskExecutor;
_id = id;
- }
-
- static String getStringAttribute(String name, Map<String,Object> attributes, String defaultVal)
- {
- final Object value = attributes.get(name);
- return value == null ? defaultVal : String.valueOf(value);
- }
-
- static Map getMapAttribute(String name, Map<String,Object> attributes, Map defaultVal)
- {
- final Object value = attributes.get(name);
- if(value == null)
- {
- return defaultVal;
- }
- else if(value instanceof Map)
+ if (attributes != null)
{
- return (Map) value;
+ Collection<String> names = getAttributeNames();
+ for (String name : names)
+ {
+ if (attributes.containsKey(name))
+ {
+ _attributes.put(name, attributes.get(name));
+ }
+ }
}
- else
+ if (defaults != null)
{
- throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Map");
+ _defaultAttributes.putAll(defaults);
}
}
-
- static <E extends Enum> E getEnumAttribute(Class<E> clazz, String name, Map<String,Object> attributes, E defaultVal)
+ protected AbstractAdapter(UUID id, TaskExecutor taskExecutor)
{
- Object obj = attributes.get(name);
- if(obj == null)
- {
- return defaultVal;
- }
- else if(clazz.isInstance(obj))
- {
- return (E) obj;
- }
- else if(obj instanceof String)
- {
- return (E) Enum.valueOf(clazz, (String)obj);
- }
- else
- {
- throw new IllegalArgumentException("Value for attribute " + name + " is not of required type " + clazz.getSimpleName());
- }
+ this(id, null, null, taskExecutor);
}
- static Boolean getBooleanAttribute(String name, Map<String,Object> attributes, Boolean defaultValue)
+ public final UUID getId()
{
- Object obj = attributes.get(name);
- if(obj == null)
- {
- return defaultValue;
- }
- else if(obj instanceof Boolean)
- {
- return (Boolean) obj;
- }
- else if(obj instanceof String)
- {
- return Boolean.parseBoolean((String) obj);
- }
- else
- {
- throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Boolean");
- }
+ return _id;
}
- static Integer getIntegerAttribute(String name, Map<String,Object> attributes, Integer defaultValue)
+ public State getDesiredState()
{
- Object obj = attributes.get(name);
- if(obj == null)
- {
- return defaultValue;
- }
- else if(obj instanceof Number)
- {
- return ((Number) obj).intValue();
- }
- else if(obj instanceof String)
- {
- return Integer.valueOf((String) obj);
- }
- else
- {
- throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Integer");
- }
+ return null; //TODO
}
- static Long getLongAttribute(String name, Map<String,Object> attributes, Long defaultValue)
+ @Override
+ public final State setDesiredState(final State currentState, final State desiredState)
+ throws IllegalStateTransitionException, AccessControlException
{
- Object obj = attributes.get(name);
- if(obj == null)
- {
- return defaultValue;
- }
- else if(obj instanceof Number)
+ if (_taskExecutor.isTaskExecutorThread())
{
- return ((Number) obj).longValue();
- }
- else if(obj instanceof String)
- {
- return Long.valueOf((String) obj);
+ if (setState(currentState, desiredState))
+ {
+ notifyStateChanged(currentState, desiredState);
+ }
}
else
{
- throw new IllegalArgumentException("Value for attribute " + name + " is not of required type Long");
+ _taskExecutor.submitAndWait(new ChangeStateTask(this, currentState, desiredState));
}
+ return getActualState();
}
- public final UUID getId()
- {
- return _id;
- }
+ /**
+ * @return true when the state has been successfully updated to desiredState or false otherwise
+ */
+ protected abstract boolean setState(State currentState, State desiredState);
- public State getDesiredState()
+ protected void notifyStateChanged(final State currentState, final State desiredState)
{
- return null; //TODO
- }
-
- public State setDesiredState(final State currentState, final State desiredState)
- throws IllegalStateTransitionException, AccessControlException
- {
- return null; //TODO
+ synchronized (_changeListeners)
+ {
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
+ {
+ listener.stateChanged(this, currentState, desiredState);
+ }
+ }
}
public void addChangeListener(final ConfigurationChangeListener listener)
@@ -176,7 +126,7 @@ abstract class AbstractAdapter implements ConfiguredObject
{
throw new NullPointerException("Cannot add a null listener");
}
- synchronized (this)
+ synchronized (_changeListeners)
{
if(!_changeListeners.contains(listener))
{
@@ -191,39 +141,76 @@ abstract class AbstractAdapter implements ConfiguredObject
{
throw new NullPointerException("Cannot remove a null listener");
}
- synchronized (this)
+ synchronized (_changeListeners)
{
return _changeListeners.remove(listener);
}
}
-
protected void childAdded(ConfiguredObject child)
{
- synchronized (this)
+ synchronized (_changeListeners)
{
- for(ConfigurationChangeListener listener : _changeListeners)
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
{
listener.childAdded(this, child);
}
}
}
-
protected void childRemoved(ConfiguredObject child)
{
- synchronized (this)
+ synchronized (_changeListeners)
{
- for(ConfigurationChangeListener listener : _changeListeners)
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
{
listener.childRemoved(this, child);
}
}
}
- public Object getAttribute(final String name)
+ protected void attributeSet(String attrinuteName, Object oldAttributeValue, Object newAttributeValue)
{
- synchronized (this)
+ synchronized (_changeListeners)
+ {
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
+ {
+ listener.attributeSet(this, attrinuteName, oldAttributeValue, newAttributeValue);
+ }
+ }
+ }
+
+ private final Object getDefaultAttribute(String name)
+ {
+ return _defaultAttributes.get(name);
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+ Object value = getActualAttribute(name);
+ if (value == null)
+ {
+ value = getDefaultAttribute(name);
+ }
+ return value;
+ }
+
+ @Override
+ public final Map<String, Object> getActualAttributes()
+ {
+ synchronized (_attributes)
+ {
+ return new HashMap<String, Object>(_attributes);
+ }
+ }
+
+ private Object getActualAttribute(final String name)
+ {
+ synchronized (_attributes)
{
return _attributes.get(name);
}
@@ -232,25 +219,41 @@ abstract class AbstractAdapter implements ConfiguredObject
public Object setAttribute(final String name, final Object expected, final Object desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- synchronized (this)
+ if (_taskExecutor.isTaskExecutorThread())
{
- Object currentValue = _attributes.get(name);
+ if (changeAttribute(name, expected, desired))
+ {
+ attributeSet(name, expected, desired);
+ }
+ }
+ else
+ {
+ _taskExecutor.submitAndWait(new SetAttributeTask(this, name, expected, desired));
+ }
+ return getAttribute(name);
+ }
+
+ protected boolean changeAttribute(final String name, final Object expected, final Object desired)
+ {
+ synchronized (_attributes)
+ {
+ Object currentValue = getAttribute(name);
if((currentValue == null && expected == null)
|| (currentValue != null && currentValue.equals(expected)))
{
_attributes.put(name, desired);
- return desired;
+ return true;
}
else
{
- return currentValue;
+ return false;
}
}
}
public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
{
- synchronized (this)
+ synchronized (_parents)
{
return (T) _parents.get(clazz);
}
@@ -258,7 +261,7 @@ abstract class AbstractAdapter implements ConfiguredObject
protected <T extends ConfiguredObject> void addParent(Class<T> clazz, T parent)
{
- synchronized (this)
+ synchronized (_parents)
{
_parents.put(clazz, parent);
}
@@ -280,4 +283,40 @@ abstract class AbstractAdapter implements ConfiguredObject
}
}
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " [id=" + _id + ", name=" + getName() + "]";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ {
+ if (_taskExecutor.isTaskExecutorThread())
+ {
+ C child = addChild(childClass, attributes, otherParents);
+ if (child != null)
+ {
+ childAdded(child);
+ }
+ return child;
+ }
+ else
+ {
+ return (C)_taskExecutor.submitAndWait(new CreateChildTask(this, childClass, attributes, otherParents));
+ }
+ }
+
+ protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+
+ protected TaskExecutor getTaskExecutor()
+ {
+ return _taskExecutor;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java
new file mode 100644
index 0000000000..ebd98f915d
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.KeyStore;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.util.MapValueConverter;
+
+public abstract class AbstractKeyStoreAdapter extends AbstractAdapter
+{
+ private String _name;
+ private String _password;
+
+ protected AbstractKeyStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes)
+ {
+ super(id, broker.getTaskExecutor());
+ addParent(Broker.class, broker);
+ _name = MapValueConverter.getStringAttribute(TrustStore.NAME, attributes);
+ _password = MapValueConverter.getStringAttribute(TrustStore.PASSWORD, attributes);
+ setMandatoryAttribute(TrustStore.PATH, attributes);
+ setOptionalAttribute(TrustStore.TYPE, attributes);
+ setOptionalAttribute(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM, attributes);
+ setOptionalAttribute(TrustStore.DESCRIPTION, attributes);
+ }
+
+ @Override
+ public String getName()
+ {
+ return _name;
+ }
+
+ @Override
+ public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException
+ {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public State getActualState()
+ {
+ return State.ACTIVE;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ @Override
+ public void setDurable(boolean durable) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ @Override
+ public long setTimeToLive(long expected, long desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public Statistics getStatistics()
+ {
+ return NoStatistics.getInstance();
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+ if(KeyStore.ID.equals(name))
+ {
+ return getId();
+ }
+ else if(KeyStore.NAME.equals(name))
+ {
+ return getName();
+ }
+ else if(KeyStore.STATE.equals(name))
+ {
+ return getActualState();
+ }
+ else if(KeyStore.DURABLE.equals(name))
+ {
+ return isDurable();
+ }
+ else if(KeyStore.LIFETIME_POLICY.equals(name))
+ {
+ return getLifetimePolicy();
+ }
+ else if(KeyStore.TIME_TO_LIVE.equals(name))
+ {
+ return getTimeToLive();
+ }
+ else if(KeyStore.CREATED.equals(name))
+ {
+
+ }
+ else if(KeyStore.UPDATED.equals(name))
+ {
+
+ }
+ else if(KeyStore.PASSWORD.equals(name))
+ {
+ return null; // for security reasons we don't expose the password
+ }
+ return super.getAttribute(name);
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ return false;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ private void setMandatoryAttribute(String name, Map<String, Object> attributeValues)
+ {
+ changeAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues));
+ }
+
+ private void setOptionalAttribute(String name, Map<String, Object> attributeValues)
+ {
+ if (attributeValues.get(name) != null)
+ {
+ changeAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues));
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java
new file mode 100644
index 0000000000..ed4af9881f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+
+public abstract class AbstractPluginAdapter extends AbstractAdapter implements Plugin
+{
+
+ protected AbstractPluginAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor)
+ {
+ super(id, defaults, attributes, taskExecutor);
+ }
+
+ @Override
+ public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public State getActualState()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ @Override
+ public void setDurable(boolean durable) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ @Override
+ public long setTimeToLive(long expected, long desired) throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Statistics getStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes,
+ ConfiguredObject... otherParents)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return AVAILABLE_ATTRIBUTES;
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+ if (ID.equals(name))
+ {
+ return getId();
+ }
+ else if (STATE.equals(name))
+ {
+ return getActualState();
+ }
+ else if (DURABLE.equals(name))
+ {
+ return isDurable();
+ }
+ else if (LIFETIME_POLICY.equals(name))
+ {
+ return getLifetimePolicy();
+ }
+ else if (TIME_TO_LIVE.equals(name))
+ {
+ return getTimeToLive();
+ }
+ else if (CREATED.equals(name))
+ {
+
+ }
+ else if (UPDATED.equals(name))
+ {
+
+ }
+ return super.getAttribute(name);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
new file mode 100644
index 0000000000..2f7e89bb2b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.KeyStore;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+
+public class AmqpPortAdapter extends PortAdapter
+{
+ private final Broker _broker;
+ private IncomingNetworkTransport _transport;
+
+ public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes, TaskExecutor taskExecutor)
+ {
+ super(id, broker, attributes, defaultAttributes, taskExecutor);
+ _broker = broker;
+ }
+
+ @Override
+ protected void onActivate()
+ {
+ Collection<Transport> transports = getTransports();
+ Set<AmqpProtocolVersion> supported = convertFromModelProtocolsToAmqp(getProtocols());
+
+ SSLContext sslContext = null;
+ if (transports.contains(Transport.SSL))
+ {
+ sslContext = createSslContext();
+ }
+
+ AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
+
+ String bindingAddress = (String) getAttribute(Port.BINDING_ADDRESS);
+ if (WILDCARD_ADDRESS.equals(bindingAddress))
+ {
+ bindingAddress = null;
+ }
+ Integer port = (Integer) getAttribute(Port.PORT);
+ InetSocketAddress bindingSocketAddress = null;
+ if ( bindingAddress == null )
+ {
+ bindingSocketAddress = new InetSocketAddress(port);
+ }
+ else
+ {
+ bindingSocketAddress = new InetSocketAddress(bindingAddress, port);
+ }
+
+ final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(
+ bindingSocketAddress, (Boolean)getAttribute(TCP_NO_DELAY),
+ (Integer)getAttribute(SEND_BUFFER_SIZE), (Integer)getAttribute(RECEIVE_BUFFER_SIZE),
+ (Boolean)getAttribute(NEED_CLIENT_AUTH), (Boolean)getAttribute(WANT_CLIENT_AUTH));
+
+ _transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+ final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(
+ _broker, supported, defaultSupportedProtocolReply);
+
+ _transport.accept(settings, protocolEngineFactory, sslContext);
+ CurrentActor.get().message(BrokerMessages.LISTENING(getTransports().toString(), getPort()));
+ }
+
+ @Override
+ protected void onStop()
+ {
+ if (_transport != null)
+ {
+ CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(getTransports().toString(), getPort()));
+ _transport.close();
+ }
+ }
+
+ private Set<AmqpProtocolVersion> convertFromModelProtocolsToAmqp(Collection<Protocol> modelProtocols)
+ {
+ Set<AmqpProtocolVersion> amqpProtocols = new HashSet<AmqpProtocolVersion>();
+ for (Protocol protocol : modelProtocols)
+ {
+ amqpProtocols.add(protocol.toAmqpProtocolVersion());
+ }
+ return amqpProtocols;
+ }
+
+ private SSLContext createSslContext()
+ {
+ KeyStore keyStore = _broker.getDefaultKeyStore();
+ if (keyStore == null)
+ {
+ throw new IllegalConfigurationException("SSL was requested on AMQP port '"
+ + this.getName() + "' but no key store defined");
+ }
+
+ TrustStore trustStore = _broker.getDefaultTrustStore();
+ if (((Boolean)getAttribute(NEED_CLIENT_AUTH) || (Boolean)getAttribute(WANT_CLIENT_AUTH)) && trustStore == null)
+ {
+ throw new IllegalConfigurationException("Client certificate authentication is enabled on AMQP port '"
+ + this.getName() + "' but no trust store defined");
+ }
+
+ String keystorePath = (String)keyStore.getAttribute(KeyStore.PATH);
+ String keystorePassword = keyStore.getPassword();
+ String keystoreType = (String)keyStore.getAttribute(KeyStore.TYPE);
+ String keyManagerFactoryAlgorithm = (String)keyStore.getAttribute(KeyStore.KEY_MANAGER_FACTORY_ALGORITHM);
+ String certAlias = (String)keyStore.getAttribute(KeyStore.CERTIFICATE_ALIAS);
+
+ final SSLContext sslContext;
+ try
+ {
+ if(trustStore != null)
+ {
+ String trustStorePassword = trustStore.getPassword();
+ String trustStoreType = (String)trustStore.getAttribute(TrustStore.TYPE);
+ String trustManagerFactoryAlgorithm = (String)trustStore.getAttribute(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM);
+ String trustStorePath = (String)trustStore.getAttribute(TrustStore.PATH);
+
+ sslContext = SSLContextFactory.buildClientContext(trustStorePath,
+ trustStorePassword,
+ trustStoreType,
+ trustManagerFactoryAlgorithm,
+ keystorePath,
+ keystorePassword, keystoreType, keyManagerFactoryAlgorithm,
+ certAlias);
+ }
+ else
+ {
+ sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm);
+ }
+ }
+ catch (GeneralSecurityException e)
+ {
+ throw new RuntimeException("Unable to create SSLContext for key or trust store", e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to create SSLContext - unable to load key/trust store", e);
+ }
+ return sslContext;
+ }
+
+ private AmqpProtocolVersion getDefaultAmqpSupportedReply()
+ {
+ String defaultAmqpSupportedReply = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY);
+ if (defaultAmqpSupportedReply != null)
+ {
+ return AmqpProtocolVersion.valueOf(defaultAmqpSupportedReply);
+ }
+ return null;
+ }
+
+
+ class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
+ {
+ private final InetSocketAddress _bindingSocketAddress;
+ private final Boolean _tcpNoDelay;
+ private final Integer _sendBufferSize;
+ private final Integer _receiveBufferSize;
+ private final boolean _needClientAuth;
+ private final boolean _wantClientAuth;
+
+ public ServerNetworkTransportConfiguration(
+ InetSocketAddress bindingSocketAddress, boolean tcpNoDelay,
+ int sendBufferSize, int receiveBufferSize,
+ boolean needClientAuth, boolean wantClientAuth)
+ {
+ _bindingSocketAddress = bindingSocketAddress;
+ _tcpNoDelay = tcpNoDelay;
+ _sendBufferSize = sendBufferSize;
+ _receiveBufferSize = receiveBufferSize;
+ _needClientAuth = needClientAuth;
+ _wantClientAuth = wantClientAuth;
+ }
+
+ @Override
+ public boolean wantClientAuth()
+ {
+ return _wantClientAuth;
+ }
+
+ @Override
+ public boolean needClientAuth()
+ {
+ return _needClientAuth;
+ }
+
+ @Override
+ public Boolean getTcpNoDelay()
+ {
+ return _tcpNoDelay;
+ }
+
+ @Override
+ public Integer getSendBufferSize()
+ {
+ return _sendBufferSize;
+ }
+
+ @Override
+ public Integer getReceiveBufferSize()
+ {
+ return _receiveBufferSize;
+ }
+
+ @Override
+ public InetSocketAddress getAddress()
+ {
+ return _bindingSocketAddress;
+ }
+ };
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java
index 8c2bc98ba7..ac4b0255d5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java
@@ -29,38 +29,50 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
import javax.security.auth.login.AccountNotFoundException;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.*;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.PasswordCredentialManagingAuthenticationProvider;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.User;
+import org.apache.qpid.server.model.VirtualHostAlias;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.group.GroupPrincipalAccessor;
+import org.apache.qpid.server.security.SecurityManager;
public abstract class AuthenticationProviderAdapter<T extends AuthenticationManager> extends AbstractAdapter implements AuthenticationProvider
{
private static final Logger LOGGER = Logger.getLogger(AuthenticationProviderAdapter.class);
- private final BrokerAdapter _broker;
private final T _authManager;
+ protected final Broker _broker;
- private AuthenticationProviderAdapter(BrokerAdapter brokerAdapter,
- final T authManager)
- {
- super(UUIDGenerator.generateRandomUUID());
- _broker = brokerAdapter;
- _authManager = authManager;
- }
+ private GroupPrincipalAccessor _groupAccessor;
- public static AuthenticationProviderAdapter createAuthenticationProviderAdapter(BrokerAdapter brokerAdapter,
- final AuthenticationManager authManager)
+ private Object _type;
+
+ private AuthenticationProviderAdapter(UUID id, Broker broker, final T authManager, Map<String, Object> attributes)
{
- return authManager instanceof PrincipalDatabaseAuthenticationManager
- ? new PrincipalDatabaseAuthenticationManagerAdapter(brokerAdapter, (PrincipalDatabaseAuthenticationManager) authManager)
- : new SimpleAuthenticationProviderAdapter(brokerAdapter, authManager);
+ super(id, null, attributes, broker.getTaskExecutor());
+ _authManager = authManager;
+ _broker = broker;
+ _type = authManager instanceof PrincipalDatabaseAuthenticationManager? PrincipalDatabaseAuthenticationManager.class.getSimpleName() : AuthenticationManager.class.getSimpleName() ;
+ addParent(Broker.class, broker);
}
T getAuthManager()
@@ -77,7 +89,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
@Override
public String getName()
{
- return _authManager.getClass().getSimpleName();
+ return (String)getAttribute(AuthenticationProvider.NAME);
}
@Override
@@ -147,7 +159,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
{
if(TYPE.equals(name))
{
- return _authManager.getClass().getSimpleName();
+ return _type;
}
else if(CREATED.equals(name))
{
@@ -165,10 +177,6 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
{
return LifetimePolicy.PERMANENT;
}
- else if(NAME.equals(name))
- {
- return getName();
- }
else if(STATE.equals(name))
{
return State.ACTIVE; // TODO
@@ -191,44 +199,86 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
}
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass,
- Map<String, Object> attributes,
- ConfiguredObject... otherParents)
+ public boolean setState(State currentState, State desiredState)
+ throws IllegalStateTransitionException, AccessControlException
{
- return null;
+ if(desiredState == State.DELETED)
+ {
+ return true;
+ }
+ else if(desiredState == State.ACTIVE)
+ {
+ if (_groupAccessor == null)
+ {
+ throw new IllegalStateTransitionException("Cannot transit into ACTIVE state with null group accessor!");
+ }
+ _authManager.initialise();
+ return true;
+ }
+ else if(desiredState == State.STOPPED)
+ {
+ _authManager.close();
+ return true;
+ }
+ return false;
}
- private static class SimpleAuthenticationProviderAdapter extends AuthenticationProviderAdapter<AuthenticationManager>
+ @Override
+ public SubjectCreator getSubjectCreator()
{
+ return new SubjectCreator(_authManager, _groupAccessor);
+ }
+
+ public void setGroupAccessor(GroupPrincipalAccessor groupAccessor)
+ {
+ _groupAccessor = groupAccessor;
+ }
+
+ public static class SimpleAuthenticationProviderAdapter extends AuthenticationProviderAdapter<AuthenticationManager>
+ {
+
public SimpleAuthenticationProviderAdapter(
- BrokerAdapter brokerAdapter, AuthenticationManager authManager)
+ UUID id, Broker broker, AuthenticationManager authManager, Map<String, Object> attributes)
+ {
+ super(id, broker,authManager, attributes);
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C createChild(Class<C> childClass,
+ Map<String, Object> attributes,
+ ConfiguredObject... otherParents)
{
- super(brokerAdapter,authManager);
+ throw new UnsupportedOperationException();
}
}
- private static class PrincipalDatabaseAuthenticationManagerAdapter
+ public static class PrincipalDatabaseAuthenticationManagerAdapter
extends AuthenticationProviderAdapter<PrincipalDatabaseAuthenticationManager>
implements PasswordCredentialManagingAuthenticationProvider
{
public PrincipalDatabaseAuthenticationManagerAdapter(
- BrokerAdapter brokerAdapter, PrincipalDatabaseAuthenticationManager authManager)
+ UUID id, Broker broker, PrincipalDatabaseAuthenticationManager authManager, Map<String, Object> attributes)
{
- super(brokerAdapter, authManager);
+ super(id, broker, authManager, attributes);
}
@Override
public boolean createUser(String username, String password, Map<String, String> attributes)
{
- return getPrincipalDatabase().createPrincipal(new UsernamePrincipal(username), password.toCharArray());
+ if(getSecurityManager().authoriseUserOperation(Operation.CREATE, username))
+ {
+ return getPrincipalDatabase().createPrincipal(new UsernamePrincipal(username), password.toCharArray());
+ }
+ else
+ {
+ throw new AccessControlException("Do not have permission to create new user");
+ }
}
@Override
public void deleteUser(String username) throws AccountNotFoundException
{
- if(getSecurityManager().authoriseMethod(Operation.DELETE,
- "UserManagement",
- "deleteUser"))
+ if(getSecurityManager().authoriseUserOperation(Operation.DELETE, username))
{
getPrincipalDatabase().deletePrincipal(new UsernamePrincipal(username));
@@ -239,9 +289,9 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
}
}
- private org.apache.qpid.server.security.SecurityManager getSecurityManager()
+ private SecurityManager getSecurityManager()
{
- return ApplicationRegistry.getInstance().getSecurityManager();
+ return _broker.getSecurityManager();
}
private PrincipalDatabase getPrincipalDatabase()
@@ -252,18 +302,13 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
@Override
public void setPassword(String username, String password) throws AccountNotFoundException
{
- getPrincipalDatabase().updatePassword(new UsernamePrincipal(username), password.toCharArray());
- }
-
- public void reload() throws IOException
- {
- if(getSecurityManager().authoriseMethod(Operation.UPDATE, "UserManagement", "reload"))
+ if(getSecurityManager().authoriseUserOperation(Operation.UPDATE, username))
{
- getPrincipalDatabase().reload();
+ getPrincipalDatabase().updatePassword(new UsernamePrincipal(username), password.toCharArray());
}
else
{
- throw new AccessControlException("Do not have permission to reload principal database");
+ throw new AccessControlException("Do not have permission to set password");
}
}
@@ -274,34 +319,41 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
Map<String, Map<String,String>> users = new HashMap<String, Map<String, String>>();
for(Principal principal : getPrincipalDatabase().getUsers())
{
- users.put(principal.getName(), Collections.EMPTY_MAP);
+ users.put(principal.getName(), Collections.<String, String>emptyMap());
}
return users;
}
+ public void reload() throws IOException
+ {
+ getPrincipalDatabase().reload();
+ }
+
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass,
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass,
Map<String, Object> attributes,
ConfiguredObject... otherParents)
{
if(childClass == User.class)
{
- Principal p = new UsernamePrincipal((String) attributes.get("name"));
- if(getSecurityManager().authoriseMethod(Operation.UPDATE, "UserManagement", "createUser"))
+ String username = (String) attributes.get("name");
+ String password = (String) attributes.get("password");
+ Principal p = new UsernamePrincipal(username);
+
+ if(createUser(username, password,null))
{
- if(getPrincipalDatabase().createPrincipal(p, ((String)attributes.get("password")).toCharArray()))
- {
- return (C) new PrincipalAdapter(p);
- }
+ @SuppressWarnings("unchecked")
+ C pricipalAdapter = (C) new PrincipalAdapter(p, getTaskExecutor());
+ return pricipalAdapter;
}
else
{
- throw new AccessControlException("Do not have permission to create a new user");
+ //TODO? Silly interface on the PrincipalDatabase at fault
+ throw new RuntimeException("Failed to create user");
}
-
}
- return super.createChild(childClass, attributes, otherParents);
+ return super.addChild(childClass, attributes, otherParents);
}
@Override
@@ -313,9 +365,11 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
Collection<User> principals = new ArrayList<User>(users.size());
for(Principal user : users)
{
- principals.add(new PrincipalAdapter(user));
+ principals.add(new PrincipalAdapter(user, getTaskExecutor()));
}
- return (Collection<C>) Collections.unmodifiableCollection(principals);
+ @SuppressWarnings("unchecked")
+ Collection<C> unmodifiablePrincipals = (Collection<C>) Collections.unmodifiableCollection(principals);
+ return unmodifiablePrincipals;
}
else
{
@@ -328,20 +382,14 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
private final Principal _user;
- public PrincipalAdapter(Principal user)
+ public PrincipalAdapter(Principal user, TaskExecutor taskExecutor)
{
- super(UUIDGenerator.generateUserUUID(PrincipalDatabaseAuthenticationManagerAdapter.this.getName(), user.getName()));
+ super(UUIDGenerator.generateUserUUID(PrincipalDatabaseAuthenticationManagerAdapter.this.getName(), user.getName()), taskExecutor);
_user = user;
}
@Override
- public String getPassword()
- {
- return null;
- }
-
- @Override
public void setPassword(String password)
{
try
@@ -445,6 +493,10 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
{
return getId();
}
+ else if(PASSWORD.equals(name))
+ {
+ return null; // for security reasons we don't expose the password
+ }
else if(NAME.equals(name))
{
return getName();
@@ -453,20 +505,19 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
+ public boolean changeAttribute(String name, Object expected, Object desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
if(name.equals(PASSWORD))
{
setPassword((String)desired);
+ return true;
}
- return super.setAttribute(name,
- expected,
- desired);
+ return super.changeAttribute(name, expected, desired);
}
@Override
- public State setDesiredState(State currentState, State desiredState)
+ protected boolean setState(State currentState, State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
if(desiredState == State.DELETED)
@@ -479,9 +530,9 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
{
LOGGER.warn("Failed to delete user " + _user, e);
}
- return State.DELETED;
+ return true;
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderFactory.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderFactory.java
new file mode 100644
index 0000000000..e5108ebbcf
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderFactory.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.plugin.AuthenticationManagerFactory;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.group.GroupPrincipalAccessor;
+import org.apache.qpid.server.model.adapter.AuthenticationProviderAdapter.PrincipalDatabaseAuthenticationManagerAdapter;
+import org.apache.qpid.server.model.adapter.AuthenticationProviderAdapter.SimpleAuthenticationProviderAdapter;
+
+public class AuthenticationProviderFactory
+{
+ private final Iterable<AuthenticationManagerFactory> _factories;
+
+ public AuthenticationProviderFactory(QpidServiceLoader<AuthenticationManagerFactory> authManagerFactoryServiceLoader)
+ {
+ _factories = authManagerFactoryServiceLoader.atLeastOneInstanceOf(AuthenticationManagerFactory.class);
+ }
+
+ /**
+ * Creates {@link AuthenticationProvider} for given ID, {@link Broker} and attributes.
+ * <p>
+ * The configured {@link AuthenticationManagerFactory}'s are used to try to create the {@link AuthenticationProvider}.
+ * The first non-null instance is returned. The factories are used in non-deterministic order.
+ * @param groupPrincipalAccessor TODO
+ */
+ public AuthenticationProvider create(UUID id, Broker broker, Map<String, Object> attributes, GroupPrincipalAccessor groupPrincipalAccessor)
+ {
+ for (AuthenticationManagerFactory factory : _factories)
+ {
+ AuthenticationManager manager = factory.createInstance(attributes);
+ if (manager != null)
+ {
+ AuthenticationProviderAdapter<?> authenticationProvider;
+ if (manager instanceof PrincipalDatabaseAuthenticationManager)
+ {
+ authenticationProvider = new PrincipalDatabaseAuthenticationManagerAdapter(id, broker,
+ (PrincipalDatabaseAuthenticationManager) manager, attributes);
+ }
+ else
+ {
+ authenticationProvider = new SimpleAuthenticationProviderAdapter(id, broker, manager, attributes);
+ }
+ authenticationProvider.setGroupAccessor(groupPrincipalAccessor);
+ return authenticationProvider;
+ }
+ }
+
+ throw new IllegalArgumentException("No authentication provider factory found for configuration attributes " + attributes);
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
index abd3160686..eb2d0dd7e2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
@@ -48,7 +48,7 @@ final class BindingAdapter extends AbstractAdapter implements Binding
ExchangeAdapter exchangeAdapter,
QueueAdapter queueAdapter)
{
- super(binding.getId());
+ super(binding.getId(), queueAdapter.getTaskExecutor());
_binding = binding;
_exchange = exchangeAdapter;
_queue = queueAdapter;
@@ -206,27 +206,20 @@ final class BindingAdapter extends AbstractAdapter implements Binding
}
@Override
- public Object setAttribute(final String name, final Object expected, final Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO
- }
-
- @Override
public Collection<String> getAttributeNames()
{
return Binding.AVAILABLE_ATTRIBUTES;
}
@Override
- public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException,
+ protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException,
AccessControlException
{
if (desiredState == State.DELETED)
{
delete();
- return State.DELETED;
+ return true;
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index f1cce2d45c..533ecfe937 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -21,82 +21,211 @@
package org.apache.qpid.server.model.adapter;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
+import javax.net.ssl.KeyManagerFactory;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.logging.LogRecorder;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.GroupProvider;
+import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
-import org.apache.qpid.server.transport.QpidAcceptor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.security.group.FileGroupManager;
+import org.apache.qpid.server.security.group.GroupManager;
+import org.apache.qpid.server.security.group.GroupPrincipalAccessor;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHostRegistry.RegistryChangeListener,
- IApplicationRegistry.PortBindingListener,
- IAuthenticationManagerRegistry.RegistryChangeListener
+public class BrokerAdapter extends AbstractAdapter implements Broker, ConfigurationChangeListener
{
-
- private final IApplicationRegistry _applicationRegistry;
- private String _name;
- private final Map<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter> _vhostAdapters =
- new HashMap<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter>();
- private final StatisticsAdapter _statistics;
- private final Map<QpidAcceptor, PortAdapter> _portAdapters = new HashMap<QpidAcceptor, PortAdapter>();
- private Collection<HTTPPortAdapter> _httpManagementPorts;
-
- private final Map<AuthenticationManager, AuthenticationProviderAdapter> _authManagerAdapters =
- new HashMap<AuthenticationManager, AuthenticationProviderAdapter>();
-
-
- public BrokerAdapter(final IApplicationRegistry instance)
- {
- super(UUIDGenerator.generateRandomUUID());
- _applicationRegistry = instance;
- _name = "Broker";
- _statistics = new StatisticsAdapter(instance);
-
- instance.getVirtualHostRegistry().addRegistryChangeListener(this);
- populateVhosts();
- instance.addPortBindingListener(this);
- populatePorts();
- instance.addRegistryChangeListener(this);
- populateAuthenticationManagers();
- }
-
- private void populateVhosts()
- {
- synchronized(_vhostAdapters)
- {
- Collection<org.apache.qpid.server.virtualhost.VirtualHost> actualVhosts =
- _applicationRegistry.getVirtualHostRegistry().getVirtualHosts();
- for(org.apache.qpid.server.virtualhost.VirtualHost vh : actualVhosts)
- {
- if(!_vhostAdapters.containsKey(vh))
- {
- _vhostAdapters.put(vh, new VirtualHostAdapter(this, vh));
- }
- }
-
+ private static final Logger LOGGER = Logger.getLogger(BrokerAdapter.class);
+
+ @SuppressWarnings("serial")
+ public static final Map<String, Class<?>> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Class<?>>(){{
+ put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_COUNT, Long.class);
+ put(ALERT_THRESHOLD_QUEUE_DEPTH, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
+ put(ALERT_REPEAT_GAP, Long.class);
+ put(FLOW_CONTROL_SIZE_BYTES, Long.class);
+ put(FLOW_CONTROL_RESUME_SIZE_BYTES, Long.class);
+ put(HOUSEKEEPING_CHECK_PERIOD, Long.class);
+
+ put(DEAD_LETTER_QUEUE_ENABLED, Boolean.class);
+ put(STATISTICS_REPORTING_RESET_ENABLED, Boolean.class);
+
+ put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
+ put(SESSION_COUNT_LIMIT, Integer.class);
+ put(HEART_BEAT_DELAY, Integer.class);
+ put(STATISTICS_REPORTING_PERIOD, Integer.class);
+
+ put(ACL_FILE, String.class);
+ put(NAME, String.class);
+ put(DEFAULT_VIRTUAL_HOST, String.class);
+ put(DEFAULT_AUTHENTICATION_PROVIDER, String.class);
+
+ put(KEY_STORE_PATH, String.class);
+ put(KEY_STORE_PASSWORD, String.class);
+ put(KEY_STORE_CERT_ALIAS, String.class);
+ put(TRUST_STORE_PATH, String.class);
+ put(TRUST_STORE_PASSWORD, String.class);
+ put(GROUP_FILE, String.class);
+ }});
+
+ public static final int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
+ public static final boolean DEFAULT_STATISTICS_REPORTING_RESET_ENABLED = false;
+ public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l;
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l;
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l;
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l;
+ public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l;
+ public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
+ public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0;
+ public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l;
+ public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l;
+ public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l;
+ public static final int DEFAULT_HEART_BEAT_DELAY = 0;
+ public static final int DEFAULT_SESSION_COUNT_LIMIT = 256;
+ public static final String DEFAULT_NAME = "QpidBroker";
+ private static final String DEFAULT_KEY_STORE_NAME = "defaultKeyStore";
+ private static final String DEFAULT_TRUST_STORE_NAME = "defaultTrustStore";
+ private static final String DEFAULT_GROUP_PROFIDER_NAME = "defaultGroupProvider";
+
+ private static final String DUMMY_PASSWORD_MASK = "********";
+
+ @SuppressWarnings("serial")
+ private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{
+ put(Broker.STATISTICS_REPORTING_PERIOD, DEFAULT_STATISTICS_REPORTING_PERIOD);
+ put(Broker.STATISTICS_REPORTING_RESET_ENABLED, DEFAULT_STATISTICS_REPORTING_RESET_ENABLED);
+ put(Broker.ALERT_REPEAT_GAP, DEFAULT_ALERT_REPEAT_GAP);
+ put(Broker.ALERT_THRESHOLD_MESSAGE_AGE, DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE);
+ put(Broker.ALERT_THRESHOLD_MESSAGE_COUNT, DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT);
+ put(Broker.ALERT_THRESHOLD_MESSAGE_SIZE, DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE);
+ put(Broker.ALERT_THRESHOLD_QUEUE_DEPTH, DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH);
+ put(Broker.DEAD_LETTER_QUEUE_ENABLED, DEFAULT_DEAD_LETTER_QUEUE_ENABLED);
+ put(Broker.MAXIMUM_DELIVERY_ATTEMPTS, DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS);
+ put(Broker.FLOW_CONTROL_RESUME_SIZE_BYTES, DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES);
+ put(Broker.FLOW_CONTROL_SIZE_BYTES, DEFAULT_FLOW_CONTROL_SIZE_BYTES);
+ put(Broker.HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD);
+ put(Broker.HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY);
+ put(Broker.SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT);
+ put(Broker.NAME, DEFAULT_NAME);
+ }});
+
+
+
+
+ private final StatisticsGatherer _statisticsGatherer;
+ private final VirtualHostRegistry _virtualHostRegistry;
+ private final LogRecorder _logRecorder;
+ private final RootMessageLogger _rootMessageLogger;
+ private StatisticsAdapter _statistics;
+
+ private final Map<String, VirtualHost> _vhostAdapters = new HashMap<String, VirtualHost>();
+ private final Map<Integer, Port> _portAdapters = new HashMap<Integer, Port>();
+ private final Map<String, AuthenticationProvider> _authenticationProviders = new HashMap<String, AuthenticationProvider>();
+ private final Map<String, GroupProvider> _groupProviders = new HashMap<String, GroupProvider>();
+ private final Map<UUID, ConfiguredObject> _plugins = new HashMap<UUID, ConfiguredObject>();
+ private final Map<UUID, KeyStore> _keyStores = new HashMap<UUID, KeyStore>();
+ private final Map<UUID, TrustStore> _trustStores = new HashMap<UUID, TrustStore>();
+
+ private final AuthenticationProviderFactory _authenticationProviderFactory;
+ private AuthenticationProvider _defaultAuthenticationProvider;
+
+ private final PortFactory _portFactory;
+ private final SecurityManager _securityManager;
+ private final UUID _defaultKeyStoreId;
+ private final UUID _defaultTrustStoreId;
+
+ public BrokerAdapter(UUID id, Map<String, Object> attributes, StatisticsGatherer statisticsGatherer, VirtualHostRegistry virtualHostRegistry,
+ LogRecorder logRecorder, RootMessageLogger rootMessageLogger, AuthenticationProviderFactory authenticationProviderFactory,
+ PortFactory portFactory, TaskExecutor taskExecutor)
+ {
+ super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor);
+ _statisticsGatherer = statisticsGatherer;
+ _virtualHostRegistry = virtualHostRegistry;
+ _logRecorder = logRecorder;
+ _rootMessageLogger = rootMessageLogger;
+ _statistics = new StatisticsAdapter(statisticsGatherer);
+ _authenticationProviderFactory = authenticationProviderFactory;
+ _portFactory = portFactory;
+ _securityManager = new SecurityManager((String)getAttribute(ACL_FILE));
+
+ _defaultKeyStoreId = UUIDGenerator.generateBrokerChildUUID(KeyStore.class.getSimpleName(), DEFAULT_KEY_STORE_NAME);
+ _defaultTrustStoreId = UUIDGenerator.generateBrokerChildUUID(TrustStore.class.getSimpleName(), DEFAULT_TRUST_STORE_NAME);
+ createBrokerChildrenFromAttributes();
+ }
+
+ /*
+ * A temporary method to create broker children that can be only configured via broker attributes
+ */
+ private void createBrokerChildrenFromAttributes()
+ {
+ String groupFile = (String) getAttribute(GROUP_FILE);
+ if (groupFile != null)
+ {
+ GroupManager groupManager = new FileGroupManager(groupFile);
+ UUID groupProviderId = UUIDGenerator.generateBrokerChildUUID(GroupProvider.class.getSimpleName(),
+ DEFAULT_GROUP_PROFIDER_NAME);
+ GroupProviderAdapter groupProviderAdapter = new GroupProviderAdapter(groupProviderId, groupManager, this);
+ addGroupProvider(groupProviderAdapter);
+ }
+ Map<String, Object> actualAttributes = getActualAttributes();
+ String keyStorePath = (String) getAttribute(KEY_STORE_PATH);
+ if (keyStorePath != null)
+ {
+ Map<String, Object> keyStoreAttributes = new HashMap<String, Object>();
+ keyStoreAttributes.put(KeyStore.NAME, DEFAULT_KEY_STORE_NAME);
+ keyStoreAttributes.put(KeyStore.PATH, keyStorePath);
+ keyStoreAttributes.put(KeyStore.PASSWORD, (String) actualAttributes.get(KEY_STORE_PASSWORD));
+ keyStoreAttributes.put(KeyStore.TYPE, java.security.KeyStore.getDefaultType());
+ keyStoreAttributes.put(KeyStore.CERTIFICATE_ALIAS, getAttribute(KEY_STORE_CERT_ALIAS));
+ keyStoreAttributes.put(KeyStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm());
+ KeyStoreAdapter KeyStoreAdapter = new KeyStoreAdapter(_defaultKeyStoreId, this, keyStoreAttributes);
+ addKeyStore(KeyStoreAdapter);
+ }
+ String trustStorePath = (String) getAttribute(TRUST_STORE_PATH);
+ if (trustStorePath != null)
+ {
+ Map<String, Object> trsustStoreAttributes = new HashMap<String, Object>();
+ trsustStoreAttributes.put(TrustStore.NAME, DEFAULT_TRUST_STORE_NAME);
+ trsustStoreAttributes.put(TrustStore.PATH, trustStorePath);
+ trsustStoreAttributes.put(TrustStore.PASSWORD, (String) actualAttributes.get(TRUST_STORE_PASSWORD));
+ trsustStoreAttributes.put(TrustStore.TYPE, java.security.KeyStore.getDefaultType());
+ trsustStoreAttributes.put(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm());
+ TrustStoreAdapter trustStore = new TrustStoreAdapter(_defaultTrustStoreId, this, trsustStoreAttributes);
+ addTrustStore(trustStore);
}
}
-
public Collection<VirtualHost> getVirtualHosts()
{
synchronized(_vhostAdapters)
@@ -105,81 +234,57 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos
}
}
- private void populatePorts()
+
+ public Collection<Port> getPorts()
{
synchronized (_portAdapters)
{
- Map<InetSocketAddress, QpidAcceptor> acceptors = _applicationRegistry.getAcceptors();
-
- for(Map.Entry<InetSocketAddress, QpidAcceptor> entry : acceptors.entrySet())
- {
- if(!_portAdapters.containsKey(entry.getValue()))
- {
- _portAdapters.put(entry.getValue(), new PortAdapter(this, entry.getValue(), entry.getKey()));
- }
- }
- if(_applicationRegistry.useHTTPManagement() || _applicationRegistry.useHTTPSManagement())
- {
- ArrayList<HTTPPortAdapter> httpPorts = new ArrayList<HTTPPortAdapter>();
- if (_applicationRegistry.useHTTPManagement())
- {
- httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPManagementPort()));
- }
- if (_applicationRegistry.useHTTPSManagement())
- {
- httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPSManagementPort(), Protocol.HTTPS, Transport.SSL));
- }
- _httpManagementPorts = Collections.unmodifiableCollection(httpPorts);
- }
+ final ArrayList<Port> ports = new ArrayList<Port>(_portAdapters.values());
+ return ports;
}
}
- public Collection<Port> getPorts()
+ public Collection<AuthenticationProvider> getAuthenticationProviders()
{
- synchronized (_portAdapters)
+ synchronized (_authenticationProviders)
{
- final ArrayList<Port> ports = new ArrayList<Port>(_portAdapters.values());
- if(_httpManagementPorts != null)
- {
- ports.addAll(_httpManagementPorts);
- }
- return ports;
+ return new ArrayList<AuthenticationProvider>(_authenticationProviders.values());
}
}
- private void populateAuthenticationManagers()
+ public AuthenticationProvider getAuthenticationProviderByName(String authenticationProviderName)
{
- synchronized (_authManagerAdapters)
+ Collection<AuthenticationProvider> providers = getAuthenticationProviders();
+ for (AuthenticationProvider authenticationProvider : providers)
{
- IAuthenticationManagerRegistry authenticationManagerRegistry =
- _applicationRegistry.getAuthenticationManagerRegistry();
- if(authenticationManagerRegistry != null)
+ if (authenticationProvider.getName().equals(authenticationProviderName))
{
- Map<String, AuthenticationManager> authenticationManagers =
- authenticationManagerRegistry.getAvailableAuthenticationManagers();
-
- for(Map.Entry<String, AuthenticationManager> entry : authenticationManagers.entrySet())
- {
- if(!_authManagerAdapters.containsKey(entry.getValue()))
- {
- _authManagerAdapters.put(entry.getValue(),
- AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this,
- entry.getValue()));
- }
- }
+ return authenticationProvider;
}
}
+ return null;
}
- public Collection<AuthenticationProvider> getAuthenticationProviders()
+ @Override
+ public AuthenticationProvider getDefaultAuthenticationProvider()
+ {
+ return _defaultAuthenticationProvider;
+ }
+
+ public void setDefaultAuthenticationProvider(AuthenticationProvider provider)
+ {
+ _defaultAuthenticationProvider = provider;
+ }
+
+ @Override
+ public Collection<GroupProvider> getGroupProviders()
{
- synchronized (_authManagerAdapters)
+ synchronized (_groupProviders)
{
- final ArrayList<AuthenticationProvider> authManagers =
- new ArrayList<AuthenticationProvider>(_authManagerAdapters.values());
- return authManagers;
+ final ArrayList<GroupProvider> groupManagers =
+ new ArrayList<GroupProvider>(_groupProviders.values());
+ return groupManagers;
}
-
}
public VirtualHost createVirtualHost(final String name,
@@ -193,22 +298,29 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos
return null; //TODO
}
- public VirtualHost createVirtualHost(final Map<String, Object> attributes)
+ private VirtualHost createVirtualHost(final Map<String, Object> attributes)
throws AccessControlException, IllegalArgumentException
{
- return null; //TODO
+ final VirtualHostAdapter virtualHostAdapter = new VirtualHostAdapter(UUID.randomUUID(), attributes, this,
+ _statisticsGatherer, getTaskExecutor());
+ addVirtualHost(virtualHostAdapter);
+ virtualHostAdapter.setDesiredState(State.INITIALISING, State.ACTIVE);
+ return virtualHostAdapter;
}
- public void deleteVirtualHost(final VirtualHost vhost)
- throws AccessControlException, IllegalStateException
+ private boolean deleteVirtualHost(final VirtualHost vhost) throws AccessControlException, IllegalStateException
{
- //TODO
- throw new UnsupportedOperationException("Not yet implemented");
+ synchronized (_vhostAdapters)
+ {
+ _vhostAdapters.remove(vhost);
+ }
+ vhost.removeChangeListener(this);
+ return true;
}
public String getName()
{
- return _name;
+ return (String)getAttribute(NAME);
}
public String setName(final String currentName, final String desiredName)
@@ -262,6 +374,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos
return _statistics;
}
+ @SuppressWarnings("unchecked")
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
@@ -277,12 +390,30 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos
{
return (Collection<C>) getAuthenticationProviders();
}
+ else if(clazz == GroupProvider.class)
+ {
+ return (Collection<C>) getGroupProviders();
+ }
+ else if(clazz == KeyStore.class)
+ {
+ return (Collection<C>) getKeyStores();
+ }
+ else if(clazz == TrustStore.class)
+ {
+ return (Collection<C>) getTrustStores();
+ }
+ else if(clazz == Plugin.class)
+ {
+ return (Collection<C>) getPlugins();
+ }
return Collections.emptySet();
}
+ //TODO: ACL
+ @SuppressWarnings("unchecked")
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == VirtualHost.class)
{
@@ -302,111 +433,107 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos
}
}
- private Port createPort(Map<String, Object> attributes)
+ private void addPort(Port port)
{
- // TODO
- return null;
+ synchronized (_portAdapters)
+ {
+ int portNumber = port.getPort();
+ if(_portAdapters.containsKey(portNumber))
+ {
+ throw new IllegalArgumentException("Cannot add port " + port + " because port number " + portNumber + " already configured");
+ }
+ _portAdapters.put(portNumber, port);
+ }
+ port.addChangeListener(this);
}
- private AuthenticationProvider createAuthenticationProvider(Map<String,Object> attributes)
+ private Port createPort(Map<String, Object> attributes)
{
- // TODO
- return null;
+ Port port = _portFactory.createPort(UUID.randomUUID(), this, attributes);
+ addPort(port);
+ return port;
}
+ private AuthenticationProvider createAuthenticationProvider(Map<String, Object> attributes)
+ {
+ // it's cheap to create the groupPrincipalAccessor on the fly
+ GroupPrincipalAccessor groupPrincipalAccessor = new GroupPrincipalAccessor(_groupProviders.values());
+
+ AuthenticationProvider authenticationProvider = _authenticationProviderFactory.create(UUID.randomUUID(), this, attributes, groupPrincipalAccessor);
+ addAuthenticationProvider(authenticationProvider);
+ return authenticationProvider;
+ }
- public void virtualHostRegistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost)
+ /**
+ * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists
+ */
+ private void addAuthenticationProvider(AuthenticationProvider authenticationProvider)
{
- VirtualHostAdapter adapter = null;
- synchronized (_vhostAdapters)
+ String name = authenticationProvider.getName();
+ synchronized (_authenticationProviders)
{
- if(!_vhostAdapters.containsKey(virtualHost))
+ if(_authenticationProviders.containsKey(name))
{
- adapter = new VirtualHostAdapter(this, virtualHost);
- _vhostAdapters.put(virtualHost, adapter);
+ throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with name " + name + " already exists");
}
+ _authenticationProviders.put(name, authenticationProvider);
}
- if(adapter != null)
- {
- childAdded(adapter);
- }
+ authenticationProvider.addChangeListener(this);
}
- public void virtualHostUnregistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost)
+ private void addGroupProvider(GroupProvider groupProvider)
{
- VirtualHostAdapter adapter = null;
-
- synchronized (_vhostAdapters)
- {
- adapter = _vhostAdapters.remove(virtualHost);
- }
- if(adapter != null)
+ synchronized (_groupProviders)
{
- childRemoved(adapter);
+ String name = groupProvider.getName();
+ if(_groupProviders.containsKey(name))
+ {
+ throw new IllegalConfigurationException("Cannot add GroupProvider because one with name " + name + " already exists");
+ }
+ _groupProviders.put(name, groupProvider);
}
+ groupProvider.addChangeListener(this);
}
- @Override
- public void authenticationManagerRegistered(AuthenticationManager authenticationManager)
+ private boolean deleteGroupProvider(GroupProvider object)
+ {
+ throw new UnsupportedOperationException("Not implemented yet!");
+ }
+
+ private void addKeyStore(KeyStore keyStore)
{
- AuthenticationProviderAdapter adapter = null;
- synchronized (_authManagerAdapters)
+ synchronized (_keyStores)
{
- if(!_authManagerAdapters.containsKey(authenticationManager))
+ if(_keyStores.containsKey(keyStore.getId()))
{
- adapter =
- AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this, authenticationManager);
- _authManagerAdapters.put(authenticationManager, adapter);
+ throw new IllegalConfigurationException("Cannot add KeyStore because one with id " + keyStore.getId() + " already exists");
}
+ _keyStores.put(keyStore.getId(), keyStore);
}
- if(adapter != null)
- {
- childAdded(adapter);
- }
+ keyStore.addChangeListener(this);
}
- @Override
- public void authenticationManagerUnregistered(AuthenticationManager authenticationManager)
+ private boolean deleteKeyStore(KeyStore object)
{
- AuthenticationProviderAdapter adapter;
- synchronized (_authManagerAdapters)
- {
- adapter = _authManagerAdapters.remove(authenticationManager);
- }
- if(adapter != null)
- {
- childRemoved(adapter);
- }
+ throw new UnsupportedOperationException("Not implemented yet!");
}
-
- @Override
- public void bound(QpidAcceptor acceptor, InetSocketAddress bindAddress)
+ private void addTrustStore(TrustStore trustStore)
{
- synchronized (_portAdapters)
+ synchronized (_trustStores)
{
- if(!_portAdapters.containsKey(acceptor))
+ if(_trustStores.containsKey(trustStore.getId()))
{
- PortAdapter adapter = new PortAdapter(this, acceptor, bindAddress);
- _portAdapters.put(acceptor, adapter);
- childAdded(adapter);
+ throw new IllegalConfigurationException("Cannot add TrustStore because one with id " + trustStore.getId() + " already exists");
}
+ _trustStores.put(trustStore.getId(), trustStore);
}
+ trustStore.addChangeListener(this);
}
- @Override
- public void unbound(QpidAcceptor acceptor)
+ private boolean deleteTrustStore(TrustStore object)
{
- PortAdapter adapter = null;
-
- synchronized (_portAdapters)
- {
- adapter = _portAdapters.remove(acceptor);
- }
- if(adapter != null)
- {
- childRemoved(adapter);
- }
+ throw new UnsupportedOperationException("Not implemented yet!");
}
@Override
@@ -422,10 +549,6 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos
{
return getId();
}
- else if(NAME.equals(name))
- {
- return getName();
- }
else if(STATE.equals(name))
{
return State.ACTIVE;
@@ -481,14 +604,316 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos
{
// TODO
}
+ else if (DEFAULT_AUTHENTICATION_PROVIDER.equals(name))
+ {
+ return _defaultAuthenticationProvider == null ? null : _defaultAuthenticationProvider.getName();
+ }
+ else if (KEY_STORE_PASSWORD.equals(name))
+ {
+ return DUMMY_PASSWORD_MASK;
+ }
+ else if (TRUST_STORE_PASSWORD.equals(name))
+ {
+ return DUMMY_PASSWORD_MASK;
+ }
+ return super.getAttribute(name);
+ }
- return super.getAttribute(name); //TODO - Implement.
+ private boolean deletePort(Port portAdapter)
+ {
+ Port removedPort = null;
+ synchronized (_portAdapters)
+ {
+ removedPort = _portAdapters.remove(portAdapter.getPort());
+ }
+ return removedPort != null;
+ }
+
+ private boolean deleteAuthenticationProvider(AuthenticationProvider authenticationProvider)
+ {
+ AuthenticationProvider removedAuthenticationProvider = null;
+ synchronized (_authenticationProviders)
+ {
+ removedAuthenticationProvider = _authenticationProviders.remove(authenticationProvider.getName());
+ }
+ return removedAuthenticationProvider != null;
+ }
+
+ private void addVirtualHost(VirtualHost virtualHost)
+ {
+ synchronized (_vhostAdapters)
+ {
+ String name = virtualHost.getName();
+ if (_vhostAdapters.containsKey(name))
+ {
+ throw new IllegalConfigurationException("Virtual host with name " + name + " is already specified!");
+ }
+ _vhostAdapters.put(name, virtualHost);
+ }
+ virtualHost.addChangeListener(this);
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.ACTIVE)
+ {
+ changeState(_groupProviders, currentState, State.ACTIVE, false);
+ changeState(_authenticationProviders, currentState, State.ACTIVE, false);
+
+ CurrentActor.set(new BrokerActor(getRootMessageLogger()));
+ try
+ {
+ changeState(_vhostAdapters, currentState, State.ACTIVE, false);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+
+ changeState(_portAdapters, currentState,State.ACTIVE, false);
+ changeState(_plugins, currentState,State.ACTIVE, false);
+ return true;
+ }
+ else if (desiredState == State.STOPPED)
+ {
+ changeState(_plugins, currentState,State.STOPPED, true);
+ changeState(_portAdapters, currentState, State.STOPPED, true);
+ changeState(_vhostAdapters,currentState, State.STOPPED, true);
+ changeState(_authenticationProviders, currentState, State.STOPPED, true);
+ changeState(_groupProviders, currentState, State.STOPPED, true);
+ return true;
+ }
+ return false;
+ }
+
+ private void changeState(Map<?, ? extends ConfiguredObject> configuredObjectMap, State currentState, State desiredState, boolean swallowException)
+ {
+ synchronized(configuredObjectMap)
+ {
+ Collection<? extends ConfiguredObject> adapters = configuredObjectMap.values();
+ for (ConfiguredObject configuredObject : adapters)
+ {
+ if (State.ACTIVE.equals(desiredState) && State.QUIESCED.equals(configuredObject.getActualState()))
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(configuredObject + " cannot be activated as it is " +State.QUIESCED);
+ }
+ continue;
+ }
+ try
+ {
+ configuredObject.setDesiredState(currentState, desiredState);
+ }
+ catch(RuntimeException e)
+ {
+ if (swallowException)
+ {
+ LOGGER.error("Failed to stop " + configuredObject, e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stateChanged(ConfiguredObject object, State oldState, State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ boolean childDeleted = false;
+ if(object instanceof AuthenticationProvider)
+ {
+ childDeleted = deleteAuthenticationProvider((AuthenticationProvider)object);
+ }
+ else if(object instanceof Port)
+ {
+ childDeleted = deletePort((Port)object);
+ }
+ else if(object instanceof VirtualHost)
+ {
+ childDeleted = deleteVirtualHost((VirtualHost)object);
+ }
+ else if(object instanceof GroupProvider)
+ {
+ childDeleted = deleteGroupProvider((GroupProvider)object);
+ }
+ else if(object instanceof KeyStore)
+ {
+ childDeleted = deleteKeyStore((KeyStore)object);
+ }
+ else if(object instanceof TrustStore)
+ {
+ childDeleted = deleteTrustStore((TrustStore)object);
+ }
+ if(childDeleted)
+ {
+ childRemoved(object);
+ }
+ }
+ }
+
+ @Override
+ public void childAdded(ConfiguredObject object, ConfiguredObject child)
+ {
+ // no-op
+ }
+
+ @Override
+ public void childRemoved(ConfiguredObject object, ConfiguredObject child)
+ {
+ // no-op
+ }
+
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ // no-op
+ }
+
+ private void addPlugin(ConfiguredObject plugin)
+ {
+ synchronized(_plugins)
+ {
+ if (_plugins.containsKey(plugin.getId()))
+ {
+ throw new IllegalConfigurationException("Plugin with id '" + plugin.getId() + "' is already registered!");
+ }
+ _plugins.put(plugin.getId(), plugin);
+ }
+ plugin.addChangeListener(this);
+ }
+
+
+ private Collection<ConfiguredObject> getPlugins()
+ {
+ synchronized(_plugins)
+ {
+ return Collections.unmodifiableCollection(_plugins.values());
+ }
+ }
+
+ public void recoverChild(ConfiguredObject object)
+ {
+ if(object instanceof AuthenticationProvider)
+ {
+ addAuthenticationProvider((AuthenticationProvider)object);
+ }
+ else if(object instanceof Port)
+ {
+ addPort((Port)object);
+ }
+ else if(object instanceof VirtualHost)
+ {
+ addVirtualHost((VirtualHost)object);
+ }
+ else if(object instanceof GroupProvider)
+ {
+ addGroupProvider((GroupProvider)object);
+ }
+ else if(object instanceof KeyStore)
+ {
+ addKeyStore((KeyStore)object);
+ }
+ else if(object instanceof TrustStore)
+ {
+ addTrustStore((TrustStore)object);
+ }
+ else if(object instanceof Plugin)
+ {
+ addPlugin(object);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Attempted to recover unexpected type of configured object: " + object.getClass().getName());
+ }
+ }
+
+ @Override
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return _rootMessageLogger;
+ }
+
+ @Override
+ public SecurityManager getSecurityManager()
+ {
+ return _securityManager;
+ }
+
+ @Override
+ public LogRecorder getLogRecorder()
+ {
+ return _logRecorder;
+ }
+
+ @Override
+ public VirtualHost findVirtualHostByName(String name)
+ {
+ return _vhostAdapters.get(name);
+ }
+
+ @Override
+ public SubjectCreator getSubjectCreator(SocketAddress localAddress)
+ {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress)localAddress;
+ AuthenticationProvider provider = _defaultAuthenticationProvider;
+ Collection<Port> ports = getPorts();
+ for (Port p : ports)
+ {
+ if (inetSocketAddress.getPort() == p.getPort())
+ {
+ provider = p.getAuthenticationProvider();
+ break;
+ }
+ }
+ return provider.getSubjectCreator();
+ }
+
+ @Override
+ public Collection<KeyStore> getKeyStores()
+ {
+ synchronized(_trustStores)
+ {
+ return Collections.unmodifiableCollection(_keyStores.values());
+ }
+ }
+
+ @Override
+ public Collection<TrustStore> getTrustStores()
+ {
+ synchronized(_trustStores)
+ {
+ return Collections.unmodifiableCollection(_trustStores.values());
+ }
+ }
+
+ @Override
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ @Override
+ public KeyStore getDefaultKeyStore()
+ {
+ return _keyStores.get(_defaultKeyStoreId);
+ }
+
+ @Override
+ public TrustStore getDefaultTrustStore()
+ {
+ return _trustStores.get(_defaultTrustStoreId);
+ }
+
+ @Override
+ public TaskExecutor getTaskExecutor()
{
- return super.setAttribute(name, expected, desired); //TODO - Implement.
+ return super.getTaskExecutor();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 5439f6a560..84f99e1f17 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -50,9 +51,9 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection
new HashMap<AMQSessionModel, SessionAdapter>();
private final Statistics _statistics;
- public ConnectionAdapter(final AMQConnectionModel conn)
+ public ConnectionAdapter(final AMQConnectionModel conn, TaskExecutor taskExecutor)
{
- super(UUIDGenerator.generateRandomUUID());
+ super(UUIDGenerator.generateRandomUUID(), taskExecutor);
_connection = conn;
_statistics = new ConnectionStatisticsAdapter(conn);
}
@@ -74,7 +75,7 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection
{
if(!_sessionAdapters.containsKey(session))
{
- _sessionAdapters.put(session, new SessionAdapter(session));
+ _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor()));
}
}
return new ArrayList<Session>(_sessionAdapters.values());
@@ -199,52 +200,6 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection
}
@Override
- public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- if(name.equals(CLIENT_ID))
- {
-
- }
- else if(name.equals(CLIENT_VERSION))
- {
-
- }
- else if(name.equals(INCOMING))
- {
-
- }
- else if(name.equals(LOCAL_ADDRESS))
- {
-
- }
- else if(name.equals(PRINCIPAL))
- {
-
- }
- else if(name.equals(PROPERTIES))
- {
-
- }
- else if(name.equals(REMOTE_ADDRESS))
- {
-
- }
- else if(name.equals(REMOTE_PROCESS_NAME))
- {
-
- }
- else if(name.equals(REMOTE_PROCESS_PID))
- {
-
- }
- else if(name.equals(SESSION_COUNT_LIMIT))
- {
-
- }
- return super.setAttribute(name, expected, desired);
- }
-
- @Override
public Collection<String> getAttributeNames()
{
final HashSet<String> attrNames = new HashSet<String>(super.getAttributeNames());
@@ -270,7 +225,8 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection
}
}
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ @Override
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == Session.class)
{
@@ -310,4 +266,11 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection
return super.getStatistic(name);
}
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ // TODO: add state management
+ return false;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
index 031d518670..e6d3fab2f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
@@ -45,7 +45,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
queueAdapter.getName(),
subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
String.valueOf(subscription.getSessionModel().getChannelId()),
- subscription.getConsumerName()));
+ subscription.getConsumerName()), queueAdapter.getTaskExecutor());
_subscription = subscription;
_queue = queueAdapter;
_statistics = new ConsumerStatistics();
@@ -108,13 +108,6 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
}
@Override
- public Object setAttribute(final String name, final Object expected, final Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO
- }
-
- @Override
public Object getAttribute(final String name)
{
if(ID.equals(name))
@@ -222,4 +215,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
return null; // TODO - Implement
}
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ // TODO : Add state management
+ return false;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
index df0f29fbc3..5d5f3f7378 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
@@ -33,16 +33,15 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener
@@ -57,7 +56,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
public ExchangeAdapter(final VirtualHostAdapter virtualHostAdapter,
final org.apache.qpid.server.exchange.Exchange exchange)
{
- super(exchange.getId());
+ super(exchange.getId(), virtualHostAdapter.getTaskExecutor());
_statistics = new ExchangeStatistics();
_vhost = virtualHostAdapter;
_exchange = exchange;
@@ -113,8 +112,8 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
throws AccessControlException, IllegalStateException
{
attributes = new HashMap<String, Object>(attributes);
- String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
- Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP);
+ String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
+ Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap());
attributes.remove(org.apache.qpid.server.model.Binding.NAME);
attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS);
@@ -257,7 +256,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
}
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == org.apache.qpid.server.model.Binding.class)
{
@@ -369,28 +368,20 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO - Implement
- }
-
- @Override
public Collection<String> getAttributeNames()
{
return AVAILABLE_ATTRIBUTES;
}
@Override
- public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException,
- AccessControlException
+ protected boolean setState(State currentState, State desiredState)
{
if (desiredState == State.DELETED)
{
delete();
- return State.DELETED;
+ return true;
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
private class ExchangeStatistics implements Statistics
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java
new file mode 100644
index 0000000000..0fa834bc28
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.security.AccessControlException;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Group;
+import org.apache.qpid.server.model.GroupMember;
+import org.apache.qpid.server.model.GroupProvider;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.security.group.GroupManager;
+import org.apache.qpid.server.security.SecurityManager;
+
+public class GroupProviderAdapter extends AbstractAdapter implements
+ GroupProvider
+{
+ private final GroupManager _groupManager;
+ private final Broker _broker;
+ public GroupProviderAdapter(UUID id, GroupManager groupManager, Broker broker)
+ {
+ super(id, broker.getTaskExecutor());
+
+ if (groupManager == null)
+ {
+ throw new IllegalArgumentException("GroupManager must not be null");
+ }
+ _groupManager = groupManager;
+ _broker = broker;
+ addParent(Broker.class, broker);
+ }
+
+ @Override
+ public String getName()
+ {
+ return _groupManager.getClass().getSimpleName();
+ }
+
+ @Override
+ public String setName(String currentName, String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ return null;
+ }
+
+ @Override
+ public State getActualState()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ @Override
+ public void setDurable(boolean durable) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(LifetimePolicy expected,
+ LifetimePolicy desired) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ return null;
+ }
+
+ @Override
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ @Override
+ public long setTimeToLive(long expected, long desired)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ return 0;
+ }
+
+ @Override
+ public Statistics getStatistics()
+ {
+ return NoStatistics.getInstance();
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return GroupProvider.AVAILABLE_ATTRIBUTES;
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+ if (TYPE.equals(name))
+ {
+ return getName();
+ }
+ else if (CREATED.equals(name))
+ {
+ // TODO
+ }
+ else if (DURABLE.equals(name))
+ {
+ return true;
+ }
+ else if (ID.equals(name))
+ {
+ return getId();
+ }
+ else if (LIFETIME_POLICY.equals(name))
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+ else if (NAME.equals(name))
+ {
+ return getName();
+ }
+ else if (STATE.equals(name))
+ {
+ return State.ACTIVE; // TODO
+ }
+ else if (TIME_TO_LIVE.equals(name))
+ {
+ // TODO
+ }
+ else if (UPDATED.equals(name))
+ {
+ // TODO
+ }
+ return super.getAttribute(name);
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass,
+ Map<String, Object> attributes, ConfiguredObject... otherParents)
+ {
+ if (childClass == Group.class)
+ {
+ String groupName = (String) attributes.get(Group.NAME);
+
+ if (getSecurityManager().authoriseGroupOperation(Operation.CREATE, groupName))
+ {
+ _groupManager.createGroup(groupName);
+ return (C) new GroupAdapter(groupName, getTaskExecutor());
+ }
+ else
+ {
+ throw new AccessControlException("Do not have permission" +
+ " to create new group");
+ }
+ }
+
+ throw new IllegalArgumentException(
+ "This group provider does not support creating children of type: "
+ + childClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+ {
+ if (clazz == Group.class)
+ {
+ Set<Principal> groups = _groupManager.getGroupPrincipals();
+ Collection<Group> principals = new ArrayList<Group>(groups.size());
+ for (Principal group : groups)
+ {
+ principals.add(new GroupAdapter(group.getName(), getTaskExecutor()));
+ }
+ return (Collection<C>) Collections
+ .unmodifiableCollection(principals);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private SecurityManager getSecurityManager()
+ {
+ return _broker.getSecurityManager();
+ }
+
+ private class GroupAdapter extends AbstractAdapter implements Group
+ {
+ private final String _group;
+
+ public GroupAdapter(String group, TaskExecutor taskExecutor)
+ {
+ super(UUIDGenerator.generateGroupUUID(GroupProviderAdapter.this.getName(), group), taskExecutor);
+ _group = group;
+
+ }
+
+ @Override
+ public String getName()
+ {
+ return _group;
+ }
+
+ @Override
+ public String setName(String currentName, String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ throw new IllegalStateException("Names cannot be updated");
+ }
+
+ @Override
+ public State getActualState()
+ {
+ return State.ACTIVE;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ @Override
+ public void setDurable(boolean durable) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ throw new IllegalStateException("Durability cannot be updated");
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(LifetimePolicy expected,
+ LifetimePolicy desired) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ throw new IllegalStateException("LifetimePolicy cannot be updated");
+ }
+
+ @Override
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ @Override
+ public long setTimeToLive(long expected, long desired)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ throw new IllegalStateException("ttl cannot be updated");
+ }
+
+ @Override
+ public Statistics getStatistics()
+ {
+ return NoStatistics.getInstance();
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(
+ Class<C> clazz)
+ {
+ if (clazz == GroupMember.class)
+ {
+ Set<Principal> usersInGroup = _groupManager
+ .getUserPrincipalsForGroup(_group);
+ Collection<GroupMember> members = new ArrayList<GroupMember>();
+ for (Principal principal : usersInGroup)
+ {
+ members.add(new GroupMemberAdapter(principal.getName(), getTaskExecutor()));
+ }
+ return (Collection<C>) Collections
+ .unmodifiableCollection(members);
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass,
+ Map<String, Object> attributes,
+ ConfiguredObject... otherParents)
+ {
+ if (childClass == GroupMember.class)
+ {
+ String memberName = (String) attributes.get(GroupMember.NAME);
+
+ if (getSecurityManager().authoriseGroupOperation(Operation.UPDATE, _group))
+ {
+ _groupManager.addUserToGroup(memberName, _group);
+ return (C) new GroupMemberAdapter(memberName, getTaskExecutor());
+ }
+ else
+ {
+ throw new AccessControlException("Do not have permission" +
+ " to add new group member");
+ }
+ }
+
+ throw new IllegalArgumentException(
+ "This group provider does not support creating children of type: "
+ + childClass);
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return Group.AVAILABLE_ATTRIBUTES;
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+ if (ID.equals(name))
+ {
+ return getId();
+ }
+ else if (NAME.equals(name))
+ {
+ return getName();
+ }
+ return super.getAttribute(name);
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ throws IllegalStateTransitionException, AccessControlException
+ {
+ if (desiredState == State.DELETED)
+ {
+ if (getSecurityManager().authoriseGroupOperation(Operation.DELETE, _group))
+ {
+ _groupManager.removeGroup(_group);
+ return true;
+ }
+ else
+ {
+ throw new AccessControlException("Do not have permission to delete group");
+ }
+ }
+
+ return false;
+ }
+
+ private class GroupMemberAdapter extends AbstractAdapter implements
+ GroupMember
+ {
+ private String _memberName;
+
+ public GroupMemberAdapter(String memberName, TaskExecutor taskExecutor)
+ {
+ super(UUIDGenerator.generateGroupMemberUUID(GroupProviderAdapter.this.getName(), _group, memberName), taskExecutor);
+ _memberName = memberName;
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return GroupMember.AVAILABLE_ATTRIBUTES;
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+ if (ID.equals(name))
+ {
+ return getId();
+ }
+ else if (NAME.equals(name))
+ {
+ return getName();
+ }
+ return super.getAttribute(name);
+ }
+
+ @Override
+ public String getName()
+ {
+ return _memberName;
+ }
+
+ @Override
+ public String setName(String currentName, String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ return null;
+ }
+
+ @Override
+ public State getActualState()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ @Override
+ public void setDurable(boolean durable)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return null;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(LifetimePolicy expected,
+ LifetimePolicy desired) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ return null;
+ }
+
+ @Override
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ @Override
+ public long setTimeToLive(long expected, long desired)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ return 0;
+ }
+
+ @Override
+ public Statistics getStatistics()
+ {
+ return NoStatistics.getInstance();
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(
+ Class<C> clazz)
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C createChild(
+ Class<C> childClass, Map<String, Object> attributes,
+ ConfiguredObject... otherParents)
+ {
+ return null;
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ throws IllegalStateTransitionException,
+ AccessControlException
+ {
+ if (desiredState == State.DELETED)
+ {
+ if (getSecurityManager().authoriseGroupOperation(Operation.UPDATE, _group))
+ {
+ _groupManager.removeUserFromGroup(_memberName, _group);
+ return true;
+ }
+ else
+ {
+ throw new AccessControlException("Do not have permission to remove group member");
+ }
+ }
+ return false;
+ }
+
+ }
+ }
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.ACTIVE)
+ {
+ return true;
+ }
+ else if (desiredState == State.STOPPED)
+ {
+ return true;
+ }
+ // TODO: DELETE state is ignored for now
+ // in case if we need to delete group provider, then we need AuthenticationProvider to be a change listener of it
+ // in order to remove deleted group provider from its group provider list
+ return false;
+ }
+
+ public Set<Principal> getGroupPrincipalsForUser(String username)
+ {
+ return _groupManager.getGroupPrincipalsForUser(username);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/HTTPPortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/HTTPPortAdapter.java
deleted file mode 100644
index 823d27160b..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/HTTPPortAdapter.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.model.adapter;
-
-import java.security.AccessControlException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHostAlias;
-
-public class HTTPPortAdapter extends AbstractAdapter implements Port
-{
- private final BrokerAdapter _broker;
- private final int _port;
- private final Protocol _protocol;
- private final Transport _transport;
-
- public HTTPPortAdapter(BrokerAdapter brokerAdapter, int port)
- {
- this(brokerAdapter, port, Protocol.HTTP, Transport.TCP);
- }
-
- public HTTPPortAdapter(BrokerAdapter brokerAdapter, int port, Protocol protocol, Transport transport)
- {
- super(UUIDGenerator.generateRandomUUID());
- _broker = brokerAdapter;
- _port = port;
- _protocol = protocol;
- _transport = transport;
- }
-
- @Override
- public String getBindingAddress()
- {
- return "0.0.0.0";
- }
-
- @Override
- public int getPort()
- {
- return _port;
- }
-
- @Override
- public Collection<Transport> getTransports()
- {
- return Collections.singleton(_transport);
- }
-
- @Override
- public void addTransport(Transport transport)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException(); // TODO - Implement
- }
-
- @Override
- public Transport removeTransport(Transport transport)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException(); // TODO - Implement
- }
-
- @Override
- public Collection<Protocol> getProtocols()
- {
- return Collections.singleton(_protocol);
- }
-
- @Override
- public void addProtocol(Protocol protocol)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException(); // TODO - Implement
- }
-
- @Override
- public Protocol removeProtocol(Protocol protocol)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException(); // TODO - Implement
- }
-
- @Override
- public Collection<VirtualHostAlias> getVirtualHostBindings()
- {
- return Collections.emptySet();
- }
-
- @Override
- public Collection<Connection> getConnections()
- {
- return Collections.emptySet(); // TODO - Implement
- }
-
- @Override
- public String getName()
- {
- return getBindingAddress() + ":" + getPort(); // TODO - Implement
- }
-
- @Override
- public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException
- {
- throw new IllegalStateException(); // TODO - Implement
- }
-
- @Override
- public State getActualState()
- {
- return State.ACTIVE;
- }
-
- @Override
- public boolean isDurable()
- {
- return false; // TODO - Implement
- }
-
- @Override
- public void setDurable(boolean durable)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException();
- }
-
- @Override
- public LifetimePolicy getLifetimePolicy()
- {
- return LifetimePolicy.PERMANENT;
- }
-
- @Override
- public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException(); // TODO - Implement
- }
-
- @Override
- public long getTimeToLive()
- {
- return 0; // TODO - Implement
- }
-
- @Override
- public long setTimeToLive(long expected, long desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException(); // TODO - Implement
- }
-
- @Override
- public Statistics getStatistics()
- {
- return NoStatistics.getInstance();
- }
-
- @Override
- public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
- {
- if(clazz == Connection.class)
- {
- return (Collection<C>) getConnections();
- }
- else
- {
- return Collections.emptySet();
- }
- }
-
- @Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object getAttribute(String name)
- {
- if(ID.equals(name))
- {
- return getId();
- }
- else if(NAME.equals(name))
- {
- return getName();
- }
- else if(STATE.equals(name))
- {
- return getActualState();
- }
- else if(DURABLE.equals(name))
- {
- return isDurable();
- }
- else if(LIFETIME_POLICY.equals(name))
- {
- return getLifetimePolicy();
- }
- else if(TIME_TO_LIVE.equals(name))
- {
- return getTimeToLive();
- }
- else if(CREATED.equals(name))
- {
-
- }
- else if(UPDATED.equals(name))
- {
-
- }
- else if(BINDING_ADDRESS.equals(name))
- {
- return getBindingAddress();
- }
- else if(PORT.equals(name))
- {
- return getPort();
- }
- else if(PROTOCOLS.equals(name))
- {
- return getProtocols();
- }
- else if(TRANSPORTS.equals(name))
- {
- return getTransports();
- }
-
- return super.getAttribute(name); //TODO - Implement
- }
-
- @Override
- public Collection<String> getAttributeNames()
- {
- return AVAILABLE_ATTRIBUTES;
- }
-
- @Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO - Implement
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java
new file mode 100644
index 0000000000..113d895e62
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.KeyStore;
+
+public class KeyStoreAdapter extends AbstractKeyStoreAdapter implements KeyStore
+{
+
+ public KeyStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes)
+ {
+ super(id, broker, attributes);
+ if (attributes.get(CERTIFICATE_ALIAS) != null)
+ {
+ changeAttribute(CERTIFICATE_ALIAS, null, attributes.get(CERTIFICATE_ALIAS));
+ }
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return AVAILABLE_ATTRIBUTES;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
index 7653fcc9b9..c4a531c923 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
@@ -21,7 +21,16 @@
package org.apache.qpid.server.model.adapter;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -30,119 +39,82 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAlias;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.net.InetSocketAddress;
-import java.security.AccessControlException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
public class PortAdapter extends AbstractAdapter implements Port
{
- private final BrokerAdapter _broker;
- private final QpidAcceptor _acceptor;
- private final InetSocketAddress _address;
- private final Collection<Protocol> _protocols;
-
- public PortAdapter(BrokerAdapter brokerAdapter, QpidAcceptor acceptor, InetSocketAddress address)
- {
- super(UUIDGenerator.generateRandomUUID());
- _broker = brokerAdapter;
- _acceptor = acceptor;
- _address = address;
- List<Protocol> protocols = new ArrayList<Protocol>();
+ private final Broker _broker;
+ private AuthenticationProvider _authenticationProvider;
- for(AmqpProtocolVersion pv : _acceptor.getSupported())
- {
- switch(pv)
- {
- case v0_8:
- protocols.add(Protocol.AMQP_0_8);
- break;
- case v0_9:
- protocols.add(Protocol.AMQP_0_9);
- break;
- case v0_9_1:
- protocols.add(Protocol.AMQP_0_9_1);
- break;
- case v0_10:
- protocols.add(Protocol.AMQP_0_10);
- break;
- case v1_0_0:
- protocols.add(Protocol.AMQP_1_0);
- break;
- }
- }
+ /*
+ * TODO register PortAceptor as a listener. For supporting multiple
+ * protocols on the same port we need to introduce a special entity like
+ * PortAceptor which will be responsible for port binding/unbinding
+ */
+ public PortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaults, TaskExecutor taskExecutor)
+ {
+ super(id, defaults, attributes, taskExecutor);
+ _broker = broker;
- _protocols = Collections.unmodifiableCollection(protocols);
+ addParent(Broker.class, broker);
}
@Override
public String getBindingAddress()
{
- return _address.getHostName();
+ return (String)getAttribute(BINDING_ADDRESS);
}
@Override
public int getPort()
{
- return _address.getPort();
+ return (Integer)getAttribute(PORT);
}
+ @SuppressWarnings("unchecked")
@Override
public Collection<Transport> getTransports()
{
- switch (_acceptor.getTransport())
- {
- case TCP:
- return Collections.singleton(Transport.TCP);
- case SSL:
- return Collections.singleton(Transport.SSL);
- }
-
- return null; // TODO - Implement
+ return (Collection<Transport>)getAttribute(TRANSPORTS);
}
@Override
public void addTransport(Transport transport)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
public Transport removeTransport(Transport transport)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
+ @SuppressWarnings("unchecked")
@Override
public Collection<Protocol> getProtocols()
{
- return _protocols;
+ return (Collection<Protocol>)getAttribute(PROTOCOLS);
}
@Override
public void addProtocol(Protocol protocol)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
public Protocol removeProtocol(Protocol protocol)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
@@ -165,31 +137,36 @@ public class PortAdapter extends AbstractAdapter implements Port
@Override
public Collection<Connection> getConnections()
{
- return null; // TODO - Implement
+ return null;
}
@Override
public String getName()
{
- return getBindingAddress() + ":" + getPort(); // TODO - Implement
+ return (String)getAttribute(NAME);
}
@Override
public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
public State getActualState()
{
- return State.ACTIVE;
+ State state = (State)super.getAttribute(STATE);
+ if (state == null)
+ {
+ return State.ACTIVE;
+ }
+ return state;
}
@Override
public boolean isDurable()
{
- return false; // TODO - Implement
+ return false;
}
@Override
@@ -209,20 +186,20 @@ public class PortAdapter extends AbstractAdapter implements Port
public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
public long getTimeToLive()
{
- return 0; // TODO - Implement
+ return 0;
}
@Override
public long setTimeToLive(long expected, long desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
@@ -257,10 +234,6 @@ public class PortAdapter extends AbstractAdapter implements Port
{
return getId();
}
- else if(NAME.equals(name))
- {
- return getName();
- }
else if(STATE.equals(name))
{
return getActualState();
@@ -285,36 +258,54 @@ public class PortAdapter extends AbstractAdapter implements Port
{
}
- else if(BINDING_ADDRESS.equals(name))
- {
- return getBindingAddress();
- }
- else if(PORT.equals(name))
+ return super.getAttribute(name);
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return AVAILABLE_ATTRIBUTES;
+ }
+
+ @Override
+ public boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.DELETED)
{
- return getPort();
+ return true;
}
- else if(PROTOCOLS.equals(name))
+ else if (desiredState == State.ACTIVE)
{
- return getProtocols();
+ onActivate();
+ return true;
}
- else if(TRANSPORTS.equals(name))
+ else if (desiredState == State.STOPPED)
{
- return getTransports();
+ onStop();
+ return true;
}
+ return false;
+ }
- return super.getAttribute(name); //TODO - Implement
+ protected void onActivate()
+ {
+ // no-op: expected to be overridden by subclass
}
- @Override
- public Collection<String> getAttributeNames()
+ protected void onStop()
{
- return AVAILABLE_ATTRIBUTES;
+ // no-op: expected to be overridden by subclass
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public AuthenticationProvider getAuthenticationProvider()
+ {
+ return _authenticationProvider;
+ }
+
+ public void setAuthenticationProvider(AuthenticationProvider authenticationProvider)
{
- return super.setAttribute(name, expected, desired); //TODO - Implement
+ _authenticationProvider = authenticationProvider;
}
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java
new file mode 100644
index 0000000000..b7441b9f3b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Protocol.ProtocolType;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.util.MapValueConverter;
+
+public class PortFactory
+{
+ public static final int DEFAULT_AMQP_SEND_BUFFER_SIZE = 262144;
+ public static final int DEFAULT_AMQP_RECEIVE_BUFFER_SIZE = 262144;
+ public static final boolean DEFAULT_AMQP_NEED_CLIENT_AUTH = false;
+ public static final boolean DEFAULT_AMQP_WANT_CLIENT_AUTH = false;
+ public static final boolean DEFAULT_AMQP_TCP_NO_DELAY = true;
+ public static final String DEFAULT_AMQP_BINDING = "*";
+ public static final Transport DEFAULT_TRANSPORT = Transport.TCP;
+
+ private final Collection<Protocol> _defaultProtocols;
+
+ public PortFactory()
+ {
+ Set<Protocol> defaultProtocols = EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1,
+ Protocol.AMQP_0_10, Protocol.AMQP_1_0);
+ String excludedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES);
+ if (excludedProtocols != null)
+ {
+ String[] excludes = excludedProtocols.split(",");
+ for (String exclude : excludes)
+ {
+ Protocol protocol = Protocol.valueOf(exclude);
+ defaultProtocols.remove(protocol);
+ }
+ }
+ String includedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES);
+ if (includedProtocols != null)
+ {
+ String[] includes = includedProtocols.split(",");
+ for (String include : includes)
+ {
+ Protocol protocol = Protocol.valueOf(include);
+ defaultProtocols.add(protocol);
+ }
+ }
+ _defaultProtocols = Collections.unmodifiableCollection(defaultProtocols);
+ }
+
+ public Port createPort(UUID id, Broker broker, Map<String, Object> objectAttributes)
+ {
+ Map<String, Object> attributes = retrieveAttributes(objectAttributes);
+
+ final Port port;
+ Map<String, Object> defaults = new HashMap<String, Object>();
+ defaults.put(Port.TRANSPORTS, Collections.singleton(DEFAULT_TRANSPORT));
+ Object portValue = attributes.get(Port.PORT);
+ if (portValue == null)
+ {
+ throw new IllegalConfigurationException("Port attribute is not specified for port: " + attributes);
+ }
+ if (isAmqpProtocol(attributes))
+ {
+ Object binding = attributes.get(Port.BINDING_ADDRESS);
+ if (binding == null)
+ {
+ binding = DEFAULT_AMQP_BINDING;
+ defaults.put(Port.BINDING_ADDRESS, DEFAULT_AMQP_BINDING);
+ }
+ defaults.put(Port.NAME, binding + ":" + portValue);
+ defaults.put(Port.PROTOCOLS, _defaultProtocols);
+ defaults.put(Port.TCP_NO_DELAY, DEFAULT_AMQP_TCP_NO_DELAY);
+ defaults.put(Port.WANT_CLIENT_AUTH, DEFAULT_AMQP_WANT_CLIENT_AUTH);
+ defaults.put(Port.NEED_CLIENT_AUTH, DEFAULT_AMQP_NEED_CLIENT_AUTH);
+ defaults.put(Port.RECEIVE_BUFFER_SIZE, DEFAULT_AMQP_RECEIVE_BUFFER_SIZE);
+ defaults.put(Port.SEND_BUFFER_SIZE, DEFAULT_AMQP_SEND_BUFFER_SIZE);
+ port = new AmqpPortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor());
+ }
+ else
+ {
+ @SuppressWarnings("unchecked")
+ Collection<Protocol> protocols = (Collection<Protocol>)attributes.get(Port.PROTOCOLS);
+ if (protocols.size() > 1)
+ {
+ throw new IllegalConfigurationException("Only one protocol can be used on non AMQP port");
+ }
+ Protocol protocol = protocols.iterator().next();
+ defaults.put(Port.NAME, portValue + "-" + protocol.name());
+ port = new PortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor());
+ }
+ return port;
+ }
+
+ private Map<String, Object> retrieveAttributes(Map<String, Object> objectAttributes)
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>(objectAttributes);
+
+ if (objectAttributes.containsKey(Port.PROTOCOLS))
+ {
+ final Set<Protocol> protocolSet = MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, objectAttributes, Protocol.class);
+ attributes.put(Port.PROTOCOLS, protocolSet);
+ }
+
+ if (objectAttributes.containsKey(Port.TRANSPORTS))
+ {
+ final Set<Transport> transportSet = MapValueConverter.getEnumSetAttribute(Port.TRANSPORTS, objectAttributes,
+ Transport.class);
+ attributes.put(Port.TRANSPORTS, transportSet);
+ }
+
+ if (objectAttributes.containsKey(Port.PORT))
+ {
+ Integer port = MapValueConverter.getIntegerAttribute(Port.PORT, objectAttributes);
+ attributes.put(Port.PORT, port);
+ }
+
+ if (objectAttributes.containsKey(Port.TCP_NO_DELAY))
+ {
+ boolean tcpNoDelay = MapValueConverter.getBooleanAttribute(Port.TCP_NO_DELAY, objectAttributes);
+ attributes.put(Port.TCP_NO_DELAY, tcpNoDelay);
+ }
+
+ if (objectAttributes.containsKey(Port.RECEIVE_BUFFER_SIZE))
+ {
+ int receiveBufferSize = MapValueConverter.getIntegerAttribute(Port.RECEIVE_BUFFER_SIZE, objectAttributes);
+ attributes.put(Port.RECEIVE_BUFFER_SIZE, receiveBufferSize);
+ }
+
+ if (objectAttributes.containsKey(Port.SEND_BUFFER_SIZE))
+ {
+ int sendBufferSize = MapValueConverter.getIntegerAttribute(Port.SEND_BUFFER_SIZE, objectAttributes);
+ attributes.put(Port.SEND_BUFFER_SIZE, sendBufferSize);
+ }
+
+ if (objectAttributes.containsKey(Port.NEED_CLIENT_AUTH))
+ {
+ boolean needClientAuth = MapValueConverter.getBooleanAttribute(Port.NEED_CLIENT_AUTH, objectAttributes);
+ attributes.put(Port.NEED_CLIENT_AUTH, needClientAuth);
+ }
+
+ if (objectAttributes.containsKey(Port.WANT_CLIENT_AUTH))
+ {
+ boolean wantClientAuth = MapValueConverter.getBooleanAttribute(Port.WANT_CLIENT_AUTH, objectAttributes);
+ attributes.put(Port.WANT_CLIENT_AUTH, wantClientAuth);
+ }
+
+ if (objectAttributes.containsKey(Port.BINDING_ADDRESS))
+ {
+ String binding = MapValueConverter.getStringAttribute(Port.BINDING_ADDRESS, objectAttributes);
+ attributes.put(Port.BINDING_ADDRESS, binding);
+ }
+
+ if (objectAttributes.containsKey(Port.STATE))
+ {
+ State state = MapValueConverter.getEnumAttribute(State.class, Port.STATE, objectAttributes);
+ attributes.put(Port.STATE, state);
+ }
+ return attributes;
+ }
+
+ private boolean isAmqpProtocol(Map<String, Object> portAttributes)
+ {
+ @SuppressWarnings("unchecked")
+ Set<Protocol> protocols = (Set<Protocol>) portAttributes.get(Port.PROTOCOLS);
+ if (protocols == null || protocols.isEmpty())
+ {
+ // defaulting to AMQP if protocol is not specified
+ return true;
+ }
+
+ Set<ProtocolType> protocolTypes = new HashSet<ProtocolType>();
+ for (Protocol protocolObject : protocols)
+ {
+ protocolTypes.add(protocolObject.getProtocolType());
+ }
+
+ if (protocolTypes.size() > 1)
+ {
+ throw new IllegalConfigurationException("Found different protocol types '" + protocolTypes
+ + "' on port configuration: " + portAttributes);
+ }
+
+ return protocolTypes.contains(ProtocolType.AMQP);
+ }
+
+ public Collection<Protocol> getDefaultProtocols()
+ {
+ return _defaultProtocols;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 78f6d38d93..f3ddf32e5a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.MapValueConverter;
final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
{
@@ -50,15 +51,16 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>();
static
{
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, "x-qpid-minimum-alert-repeat-gap");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, "x-qpid-maximum-message-age");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, "x-qpid-maximum-message-size");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, "x-qpid-maximum-message-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, "x-qpid-maximum-delivery-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY);
@@ -78,7 +80,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
{
- super(queue.getId());
+ super(queue.getId(), virtualHostAdapter.getTaskExecutor());
_vhost = virtualHostAdapter;
addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
@@ -205,47 +207,47 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
@Override
- public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
try
{
if(ALERT_REPEAT_GAP.equals(name))
{
_queue.setMinimumAlertRepeatGap((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
{
_queue.setMaximumMessageAge((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
{
_queue.setMaximumMessageSize((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
{
_queue.setMaximumQueueDepth((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
{
_queue.setMaximumMessageCount((Long)desired);
- return desired;
+ return true;
}
else if(ALTERNATE_EXCHANGE.equals(name))
{
// In future we may want to accept a UUID as an alternative way to identifying the exchange
ExchangeAdapter alternateExchange = (ExchangeAdapter) desired;
_queue.setAlternateExchange(alternateExchange == null ? null : alternateExchange.getExchange());
- return desired;
+ return true;
}
else if(EXCLUSIVE.equals(name))
{
Boolean exclusiveFlag = (Boolean) desired;
_queue.setExclusive(exclusiveFlag);
- return desired;
+ return true;
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
@@ -266,7 +268,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
{
_queue.setMaximumDeliveryCount((Integer)desired);
- return desired;
+ return true;
}
else if(NO_LOCAL.equals(name))
{
@@ -279,12 +281,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
{
_queue.setCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
{
_queue.setFlowResumeCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_STOPPED.equals(name))
{
@@ -301,10 +303,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if (DESCRIPTION.equals(name))
{
_queue.setDescription((String) desired);
- return desired;
+ return true;
}
- return super.setAttribute(name, expected, desired);
+ return super.changeAttribute(name, expected, desired);
}
finally
{
@@ -495,8 +497,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
throws AccessControlException, IllegalStateException
{
attributes = new HashMap<String, Object>(attributes);
- String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
- Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP);
+ String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
+ Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap());
attributes.remove(org.apache.qpid.server.model.Binding.NAME);
attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS);
@@ -508,7 +510,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == org.apache.qpid.server.model.Binding.class)
{
@@ -712,15 +714,15 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
@Override
- public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException,
+ protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException,
AccessControlException
{
if (desiredState == State.DELETED)
{
delete();
- return State.DELETED;
+ return true;
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index d802697d67..2fffdb32f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
final class SessionAdapter extends AbstractAdapter implements Session
@@ -44,9 +45,9 @@ final class SessionAdapter extends AbstractAdapter implements Session
private AMQSessionModel _session;
private SessionStatistics _statistics;
- public SessionAdapter(final AMQSessionModel session)
+ public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
{
- super(UUIDGenerator.generateRandomUUID());
+ super(UUIDGenerator.generateRandomUUID(), taskExecutor);
_session = session;
_statistics = new SessionStatistics();
}
@@ -141,13 +142,6 @@ final class SessionAdapter extends AbstractAdapter implements Session
return super.getAttribute(name); //TODO - Implement
}
- @Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO - Implement
- }
-
public Statistics getStatistics()
{
return _statistics;
@@ -237,4 +231,11 @@ final class SessionAdapter extends AbstractAdapter implements Session
return null; // TODO - Implement
}
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ // TODO : add state management
+ return false;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/TrustStoreAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/TrustStoreAdapter.java
new file mode 100644
index 0000000000..bdffe605ec
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/TrustStoreAdapter.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.TrustStore;
+
+public class TrustStoreAdapter extends AbstractKeyStoreAdapter implements TrustStore
+{
+ public TrustStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes)
+ {
+ super(id, broker, attributes);
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return AVAILABLE_ATTRIBUTES;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 35838e51d2..1d50be279f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.model.adapter;
+import java.io.File;
import java.security.AccessControlException;
import java.security.Principal;
import java.util.ArrayList;
@@ -31,17 +32,27 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SystemConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueType;
import org.apache.qpid.server.model.State;
@@ -49,22 +60,38 @@ import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAlias;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
+public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
QueueRegistry.RegistryChangeListener,
IConnectionRegistry.RegistryChangeListener
{
- private final org.apache.qpid.server.virtualhost.VirtualHost _virtualHost;
+ @SuppressWarnings("serial")
+ public static final Map<String, Class<?>> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Class<?>>(){{
+ put(NAME, String.class);
+ put(STORE_PATH, String.class);
+ put(STORE_TYPE, String.class);
+ put(CONFIG_PATH, String.class);
+ put(STATE, State.class);
+ }});
+
+ private org.apache.qpid.server.virtualhost.VirtualHost _virtualHost;
private final Map<AMQConnectionModel, ConnectionAdapter> _connectionAdapters =
new HashMap<AMQConnectionModel, ConnectionAdapter>();
@@ -74,37 +101,52 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
private final Map<org.apache.qpid.server.exchange.Exchange, ExchangeAdapter> _exchangeAdapters =
new HashMap<org.apache.qpid.server.exchange.Exchange, ExchangeAdapter>();
-
- private final StatisticsAdapter _statistics;
-
- private final BrokerAdapter _broker;
-
+ private StatisticsAdapter _statistics;
+ private final Broker _broker;
private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>();
+ private StatisticsGatherer _brokerStatisticsGatherer;
-
- VirtualHostAdapter(BrokerAdapter brokerAdapter,
- final org.apache.qpid.server.virtualhost.VirtualHost virtualHost)
+ public VirtualHostAdapter(UUID id, Map<String, Object> attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer, TaskExecutor taskExecutor)
{
- super(virtualHost.getId());
- _broker = brokerAdapter;
- _virtualHost = virtualHost;
- _statistics = new VirtualHostStatisticsAdapter(virtualHost);
- virtualHost.getQueueRegistry().addRegistryChangeListener(this);
- populateQueues();
- virtualHost.getExchangeRegistry().addRegistryChangeListener(this);
- populateExchanges();
- virtualHost.getConnectionRegistry().addRegistryChangeListener(this);
- populateConnections();
-
+ super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor);
+ validateAttributes();
+ _broker = broker;
+ _brokerStatisticsGatherer = brokerStatisticsGatherer;
+ addParent(Broker.class, broker);
+ }
+ private void validateAttributes()
+ {
+ String name = getName();
+ if (name == null || "".equals(name.trim()))
+ {
+ throw new IllegalConfigurationException("Virtual host name must be specified");
+ }
- for(Port port :_broker.getPorts())
+ String configurationFile = (String) getAttribute(CONFIG_PATH);
+ String storePath = (String) getAttribute(STORE_PATH);
+ String storeType = (String) getAttribute(STORE_TYPE);
+ boolean invalidAttributes = false;
+ if (configurationFile == null)
+ {
+ if (storePath == null || storeType == null)
+ {
+ invalidAttributes = true;
+ }
+ }
+ else
+ {
+ if (storePath != null || storeType != null)
+ {
+ invalidAttributes = true;
+ }
+ }
+ if (invalidAttributes)
{
- _aliases.add(new VirtualHostAliasAdapter(this, port));
+ throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or both 'storePath' and 'storeType' attributes");
}
}
-
private void populateExchanges()
{
Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges =
@@ -126,37 +168,22 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
private void populateQueues()
{
Collection<AMQQueue> actualQueues = _virtualHost.getQueueRegistry().getQueues();
-
- synchronized(_queueAdapters)
- {
- for(AMQQueue queue : actualQueues)
- {
- if(!_queueAdapters.containsKey(queue))
- {
- _queueAdapters.put(queue, new QueueAdapter(this,queue));
- }
- }
- }
- }
-
- private void populateConnections()
- {
-
- List<AMQConnectionModel> actualConnections = _virtualHost.getConnectionRegistry().getConnections();
-
- synchronized(_connectionAdapters)
+ if ( actualQueues != null )
{
- for(AMQConnectionModel conn : actualConnections)
+ synchronized(_queueAdapters)
{
- if(!_connectionAdapters.containsKey(conn))
+ for(AMQQueue queue : actualQueues)
{
- _connectionAdapters.put(conn, new ConnectionAdapter(conn));
+ if(!_queueAdapters.containsKey(queue))
+ {
+ _queueAdapters.put(queue, new QueueAdapter(this, queue));
+ }
}
}
}
-
}
+ @Override
public String getReplicationGroupName()
{
return null; //TODO
@@ -198,12 +225,12 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
{
attributes = new HashMap<String, Object>(attributes);
- String name = getStringAttribute(Exchange.NAME, attributes, null);
- State state = getEnumAttribute(State.class, Exchange.STATE, attributes, State.ACTIVE);
- boolean durable = getBooleanAttribute(Exchange.DURABLE, attributes, false);
- LifetimePolicy lifetime = getEnumAttribute(LifetimePolicy.class, Exchange.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT);
- String type = getStringAttribute(Exchange.TYPE, attributes, null);
- long ttl = getLongAttribute(Exchange.TIME_TO_LIVE, attributes, 0l);
+ String name = MapValueConverter.getStringAttribute(Exchange.NAME, attributes, null);
+ State state = MapValueConverter.getEnumAttribute(State.class, Exchange.STATE, attributes, State.ACTIVE);
+ boolean durable = MapValueConverter.getBooleanAttribute(Exchange.DURABLE, attributes, false);
+ LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Exchange.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT);
+ String type = MapValueConverter.getStringAttribute(Exchange.TYPE, attributes, null);
+ long ttl = MapValueConverter.getLongAttribute(Exchange.TIME_TO_LIVE, attributes, 0l);
attributes.remove(Exchange.NAME);
attributes.remove(Exchange.STATE);
@@ -266,7 +293,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
if (attributes.containsKey(Queue.TYPE))
{
- String typeAttribute = getStringAttribute(Queue.TYPE, attributes, null);
+ String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null);
QueueType queueType = null;
try
{
@@ -289,12 +316,12 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
throw new IllegalArgumentException("Sort key is not specified for sorted queue");
}
}
- String name = getStringAttribute(Queue.NAME, attributes, null);
- State state = getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE);
- boolean durable = getBooleanAttribute(Queue.DURABLE, attributes, false);
- LifetimePolicy lifetime = getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT);
- long ttl = getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l);
- boolean exclusive= getBooleanAttribute(Queue.EXCLUSIVE, attributes, false);
+ String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null);
+ State state = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE);
+ boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
+ LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT);
+ long ttl = MapValueConverter.getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l);
+ boolean exclusive= MapValueConverter.getBooleanAttribute(Queue.EXCLUSIVE, attributes, false);
attributes.remove(Queue.NAME);
attributes.remove(Queue.STATE);
@@ -328,11 +355,10 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
String owner = null;
if(exclusive)
{
- Set<Principal> principals =
- SecurityManager.getThreadSubject().getPrincipals();
- if(principals != null && !principals.isEmpty())
+ Principal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(SecurityManager.getThreadSubject());
+ if(authenticatedPrincipal != null)
{
- owner = principals.iterator().next().getName();
+ owner = authenticatedPrincipal.getName();
}
}
try
@@ -370,7 +396,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
public String getName()
{
- return _virtualHost.getName();
+ return (String)getAttribute(NAME);
}
public String setName(final String currentName, final String desiredName)
@@ -379,9 +405,36 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
throw new IllegalStateException();
}
+ @Override
public State getActualState()
{
- return getDesiredState();
+ if (_virtualHost == null)
+ {
+ State state = (State)super.getAttribute(STATE);
+ if (state == null)
+ {
+ return State.INITIALISING;
+ }
+ return state;
+ }
+ else
+ {
+ org.apache.qpid.server.virtualhost.State implementationState = _virtualHost.getState();
+ switch(implementationState)
+ {
+ case INITIALISING:
+ return State.INITIALISING;
+ case ACTIVE:
+ return State.ACTIVE;
+ case PASSIVE:
+ return State.QUIESCED;
+ case STOPPED:
+ return State.STOPPED;
+ default:
+ // unexpected state
+ return null;
+ }
+ }
}
public boolean isDurable()
@@ -448,7 +501,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
}
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == Exchange.class)
{
@@ -548,7 +601,7 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
{
if(!_connectionAdapters.containsKey(connection))
{
- adapter = new ConnectionAdapter(connection);
+ adapter = new ConnectionAdapter(connection, getTaskExecutor());
_connectionAdapters.put(connection, adapter);
}
@@ -709,13 +762,9 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
{
return getId();
}
- else if(NAME.equals(name))
- {
- return getName();
- }
else if(STATE.equals(name))
{
- return State.ACTIVE;
+ return getActualState();
}
else if(DURABLE.equals(name))
{
@@ -737,10 +786,19 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
{
// TODO
}
- else if(SUPPORTED_EXCHANGE_TYPES.equals(name))
+ else if (_virtualHost != null)
+ {
+ return getAttributeFromVirtualHostImplementation(name);
+ }
+ return super.getAttribute(name);
+ }
+
+ private Object getAttributeFromVirtualHostImplementation(String name)
+ {
+ if(SUPPORTED_EXCHANGE_TYPES.equals(name))
{
List<String> types = new ArrayList<String>();
- for(ExchangeType type : _virtualHost.getExchangeFactory().getRegisteredTypes())
+ for(@SuppressWarnings("rawtypes") ExchangeType type : _virtualHost.getExchangeFactory().getRegisteredTypes())
{
types.add(type.getName().asString());
}
@@ -754,10 +812,6 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
{
return _virtualHost.getConfiguration().isDeadLetterQueueEnabled();
}
- else if(FEDERATION_TAG.equals(name))
- {
- return _virtualHost.getFederationTag();
- }
else if(HOUSEKEEPING_CHECK_PERIOD.equals(name))
{
return _virtualHost.getConfiguration().getHousekeepingCheckPeriod();
@@ -778,9 +832,9 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
{
return _virtualHost.getMessageStore().getStoreType();
}
- else if(STORE_CONFIGURATION.equals(name))
+ else if(STORE_PATH.equals(name))
{
- // TODO
+ return _virtualHost.getMessageStore().getStoreLocation();
}
else if(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE.equals(name))
{
@@ -822,13 +876,6 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO - Implement
- }
-
- @Override
public Collection<String> getAttributeNames()
{
return AVAILABLE_ATTRIBUTES;
@@ -889,4 +936,111 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
}
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.ACTIVE)
+ {
+ activate();
+ return true;
+ }
+ else if (desiredState == State.STOPPED)
+ {
+ if (_virtualHost != null)
+ {
+ try
+ {
+ _virtualHost.close();
+ }
+ finally
+ {
+ _broker.getVirtualHostRegistry().unregisterVirtualHost(_virtualHost);
+ }
+ }
+ return true;
+ }
+ else if (desiredState == State.DELETED)
+ {
+ //TODO: add ACL check to authorize the operation
+ if (_virtualHost != null && _virtualHost.getState() == org.apache.qpid.server.virtualhost.State.ACTIVE)
+ {
+ setDesiredState(currentState, State.STOPPED);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void activate()
+ {
+ VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry();
+ String virtualHostName = getName();
+ try
+ {
+ VirtualHostConfiguration configuration = createVirtualHostConfiguration(virtualHostName);
+ _virtualHost = new VirtualHostImpl(_broker.getVirtualHostRegistry(), _brokerStatisticsGatherer, _broker.getSecurityManager(), configuration);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create virtual host " + virtualHostName, e);
+ }
+
+ virtualHostRegistry.registerVirtualHost(_virtualHost);
+
+ _statistics = new VirtualHostStatisticsAdapter(_virtualHost);
+ _virtualHost.getQueueRegistry().addRegistryChangeListener(this);
+ populateQueues();
+ _virtualHost.getExchangeRegistry().addRegistryChangeListener(this);
+ populateExchanges();
+ _virtualHost.getConnectionRegistry().addRegistryChangeListener(this);
+
+ synchronized(_aliases)
+ {
+ for(Port port :_broker.getPorts())
+ {
+ if (Protocol.hasAmqpProtocol(port.getProtocols()))
+ {
+ _aliases.add(new VirtualHostAliasAdapter(this, port));
+ }
+ }
+ }
+ }
+
+ private VirtualHostConfiguration createVirtualHostConfiguration(String virtualHostName) throws ConfigurationException
+ {
+ VirtualHostConfiguration configuration;
+ String configurationFile = (String)getAttribute(CONFIG_PATH);
+ if (configurationFile == null)
+ {
+ final MyConfiguration basicConfiguration = new MyConfiguration();
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.addProperty("store.type", (String)getAttribute(STORE_TYPE));
+ config.addProperty("store.environment-path", (String)getAttribute(STORE_PATH));
+ basicConfiguration.addConfiguration(config);
+
+ CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
+ compositeConfiguration.addConfiguration(new SystemConfiguration());
+ compositeConfiguration.addConfiguration(basicConfiguration);
+ configuration = new VirtualHostConfiguration(virtualHostName, compositeConfiguration , _broker);
+ }
+ else
+ {
+ configuration = new VirtualHostConfiguration(virtualHostName, new File(configurationFile) , _broker);
+ }
+ return configuration;
+ }
+
+ @Override
+ public SecurityManager getSecurityManager()
+ {
+ return _virtualHost.getSecurityManager();
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _virtualHost.getMessageStore();
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
index 367d1ff518..91b705b004 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
@@ -43,7 +43,7 @@ public class VirtualHostAliasAdapter extends AbstractAdapter implements Virtual
public VirtualHostAliasAdapter(VirtualHostAdapter virtualHostAdapter, Port port)
{
- super(UUIDGenerator.generateVhostAliasUUID(virtualHostAdapter.getName(), port.getName()));
+ super(UUIDGenerator.generateVhostAliasUUID(virtualHostAdapter.getName(), port.getName()), virtualHostAdapter.getTaskExecutor());
_vhost = virtualHostAdapter;
_port = port;
}
@@ -140,4 +140,11 @@ public class VirtualHostAliasAdapter extends AbstractAdapter implements Virtual
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ // TODO: state is not supported at the moment
+ return false;
+ }
}