summaryrefslogtreecommitdiff
path: root/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration')
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerAlreadyConnectedException.java53
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionData.java280
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java136
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionException.java42
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java485
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ConfigurationException.java51
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java240
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/IParser.java44
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java64
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java249
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java54
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownAccessCodeException.java53
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownBrokerException.java43
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownTypeCodeException.java53
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java109
15 files changed, 1956 insertions, 0 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerAlreadyConnectedException.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerAlreadyConnectedException.java
new file mode 100644
index 0000000000..f23bf9d25e
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerAlreadyConnectedException.java
@@ -0,0 +1,53 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.management.configuration;
+
+/**
+ * Thrown when an attempt is made in order to connect QMan with an already connected broker.
+ *
+ * @author Andrea Gazzarini
+ */
+public class BrokerAlreadyConnectedException extends Exception {
+
+ private static final long serialVersionUID = -5082431738056504669L;
+
+ private BrokerConnectionData _connectionData;
+
+ /**
+ * Builds a new exception with the given data.
+ *
+ * @param connectionData the broker connection data.
+ */
+ public BrokerAlreadyConnectedException(BrokerConnectionData connectionData) {
+ this._connectionData = connectionData;
+ }
+
+ /**
+ * Returns the connection data of the connected broker.
+ *
+ * @return the connection data of the connected broker.
+ */
+ public BrokerConnectionData getBrokerConnectionData()
+ {
+ return _connectionData;
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionData.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionData.java
new file mode 100644
index 0000000000..a64659b17c
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionData.java
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+/**
+ * Value object which is holding connection data for a specific broker.
+ *
+ * @author Andrea Gazzarini
+ */
+public class BrokerConnectionData
+{
+ private String _host;
+ private int _port;
+ private String _virtualHost;
+ private String _username;
+ private String _password;
+ private int _maxPoolCapacity;
+ private int _initialPoolCapacity;
+ private long _maxWaitTimeout;
+
+ /**
+ * Builds a connection data with the given parameters.
+ *
+ * @param host the hostname where the broker is running.
+ * @param port the port where the broker is running.
+ * @param username the username for connecting with the broker.
+ * @param password the password for connecting with the broker.
+ * @param virtualHost the virtual host.
+ * @param initialPoolCapacity the number of connections that must be immediately opened.
+ * @param maxPoolCapacity the maximum number of opened connection.
+ * @param maxWaitTimeout the maximum amount of time that a client will wait for obtaining a connection.
+ */
+ public BrokerConnectionData(
+ String host,
+ int port,
+ String virtualHost,
+ String username,
+ String password,
+ int initialPoolCapacity,
+ int maxPoolCapacity,
+ long waitTimeout) {
+
+ this._host = host;
+ this._port = port;
+ this._virtualHost = virtualHost;
+ this._username = username;
+ this._password = password;
+ _maxPoolCapacity = maxPoolCapacity;
+ _initialPoolCapacity = initialPoolCapacity;
+ _maxWaitTimeout = waitTimeout;
+ }
+
+ /**
+ * Builds a new empty broker connection data object.
+ */
+ BrokerConnectionData()
+ {
+ }
+
+ /**
+ * Sets the value of host property for this connection data.
+ *
+ * @param host the host name.
+ */
+ void setHost (String host)
+ {
+ this._host = host;
+ }
+
+ /**
+ * Sets the value of port property for this connection data.
+ *
+ * @param port the port.
+ */
+ void setPort (String port)
+ {
+ this._port = Integer.parseInt(port);
+ }
+
+ /**
+ * Sets the value of virtual host property for this connection data.
+ *
+ * @param virtualHost the virtual host.
+ */
+ void setVirtualHost (String virtualHost)
+ {
+ this._virtualHost = virtualHost;
+ }
+
+ /**
+ * Sets the value of username property for this connection data.
+ *
+ * @param username the username.
+ */
+ void setUsername(String username)
+ {
+ this._username = username;
+ }
+
+ /**
+ * Sets the value of password property for this connection data.
+ *
+ * @param password the password.
+ */
+ void setPassword(String password)
+ {
+ this._password = password;
+ }
+
+ /**
+ * Returns the value of the host property.
+ *
+ * @return the value of the host property.
+ */
+ public String getHost ()
+ {
+ return _host;
+ }
+
+ /**
+ * Returns the value of the port property.
+ *
+ * @return the value of the port property.
+ */
+ public int getPort ()
+ {
+ return _port;
+ }
+
+ /**
+ * Returns the value of the virtual host property.
+ *
+ * @return the value of the virtual host property.
+ */
+ public String getVirtualHost ()
+ {
+ return _virtualHost;
+ }
+
+ /**
+ * Returns the value of the username property.
+ *
+ * @return the value of the username property.
+ */
+ public String getUsername ()
+ {
+ return _username;
+ }
+
+ /**
+ * Returns the value of the password property.
+ *
+ * @return the value of the password property.
+ */
+ public String getPassword ()
+ {
+ return _password;
+ }
+
+ // sofia:5663@pippo/sung1
+ @Override
+ public String toString ()
+ {
+ return new StringBuilder()
+ .append(_host)
+ .append(':')
+ .append(_port)
+ .append('@')
+ .append(_virtualHost)
+ .toString();
+ }
+
+ /**
+ * Sets the max number of allowed connections that can be opened.
+ *
+ * @param value the max number of allowed connections that can be opened.
+ * @throws NumberFormatException if the given value is not a valid integer.
+ */
+ public void setMaxPoolCapacity (String value)
+ {
+ _maxPoolCapacity = Integer.parseInt(value);
+ }
+
+ /**
+ * Sets the max wait timeout for retrieving an available connections from the pool.
+ *
+ * @param value the max wait timeout for retrieving an available connections from the pool..
+ * @throws NumberFormatException if the given value is not a valid long.
+ */
+ public void setMaxWaitTimeout (String value)
+ {
+ this._maxWaitTimeout = Long.parseLong(value);
+ }
+
+ /**
+ * Returns the max number of allowed connections that can be opened.
+ *
+ * @return the max number of allowed connections that can be opened.
+ */
+ public int getMaxPoolCapacity ()
+ {
+ return _maxPoolCapacity;
+ }
+
+ /**
+ * Returns the max wait timeout for retrieving an available connections from the pool.
+ *
+ * @return the max wait timeout for retrieving an available connections from the pool.
+ */
+ public long getMaxWaitTimeout ()
+ {
+ return _maxWaitTimeout;
+ }
+
+ /**
+ * Sets the initial connection pool capacity.
+ *
+ * @param capacity the initial connection pool capacity.
+ */
+ public void setInitialPoolCapacity (String capacity)
+ {
+ _initialPoolCapacity = Integer.parseInt(capacity);
+ }
+
+ /**
+ * Returns the initial connection pool capacity.
+ *
+ * @return the initial connection pool capacity.
+ */
+ public int getInitialPoolCapacity ()
+ {
+ return _initialPoolCapacity;
+ }
+
+ @Override
+ public boolean equals(Object object)
+ {
+ if(object instanceof BrokerConnectionData)
+ {
+ try
+ {
+ BrokerConnectionData connectionData = (BrokerConnectionData) object;
+ return (_host.equals(connectionData._host) )
+ && (_port == connectionData._port)
+ && (_virtualHost.equals(connectionData._virtualHost));
+ }
+ catch (Exception exception)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return _host.hashCode()+_port+_virtualHost.hashCode();
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
new file mode 100644
index 0000000000..39981dc7cb
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.UUID;
+
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Parser used for building broker connection data settings.
+ * The corresponding section on the configuration file is :
+ *
+ <broker>
+ <host>192.168.148.131</host>
+ <port>5672</port>
+ <virtual-host>test</virtual-host>
+ <user>guest</user>
+ <password>guest</password>
+ <max-pool-capacity>4</max-pool-capacity>
+ <initial-pool-capacity>4</initial-pool-capacity>
+ <max-wait-timeout>-1</max-wait-timeout>
+ </broker>
+ *
+ * @author Andrea Gazzarini
+ */
+class BrokerConnectionDataParser implements IParser
+{
+ private final static Logger LOGGER = Logger.get(Configuration.class);
+ private BrokerConnectionData _connectionData = new BrokerConnectionData();
+ private String _currentValue;
+
+ /**
+ * Callback : the given value is the text content of the current node.
+ */
+ public void setCurrrentAttributeValue (String value)
+ {
+ this._currentValue = value;
+ }
+
+ /**
+ * Callback: each time the end of an element is reached this method is called.
+ * It's here that the built mapping is injected into the configuration.
+ * <broker>
+ <host>192.168.61.130</host>
+ <port>5673</port>
+ <virtual-host>test</virtual-host>
+ <user>andrea</user>
+ <password>andrea</password>
+ </broker>
+ */
+ public void setCurrentAttributeName (String name)
+ {
+ switch (Tag.get(name))
+ {
+ case HOST:
+ {
+ _connectionData.setHost(_currentValue);
+ break;
+ }
+ case PORT :
+ {
+ _connectionData.setPort(_currentValue);
+ break;
+ }
+ case VIRTUAL_HOST:
+ {
+ _connectionData.setVirtualHost(_currentValue);
+ break;
+ }
+ case USER :
+ {
+ _connectionData.setUsername(_currentValue);
+ break;
+ }
+ case MAX_POOL_CAPACITY:
+ {
+ _connectionData.setMaxPoolCapacity (_currentValue);
+ break;
+ }
+ case INITIAL_POOL_CAPACITY:
+ {
+ _connectionData.setInitialPoolCapacity(_currentValue);
+ break;
+ }
+ case MAX_WAIT_TIMEOUT:
+ {
+ _connectionData.setMaxWaitTimeout(_currentValue);
+ break;
+ }
+ case PASSWORD:
+ {
+ _connectionData.setPassword(_currentValue);
+ break;
+ }
+ case BROKER:
+ {
+ try
+ {
+ Configuration.getInstance().addBrokerConnectionData(getUUId(),_connectionData);
+ } catch(Exception exception)
+ {
+ LOGGER.error(exception, Messages.QMAN_100007_UNABLE_TO_CONNECT_WITH_BROKER, _connectionData);
+ }
+ _connectionData = new BrokerConnectionData();
+ break;
+ }
+ }
+ }
+
+ /**
+ * Gets an uuid in order to associate current connection data with a broker.
+ * @return
+ */
+ UUID getUUId(){
+ return UUID.randomUUID();
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionException.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionException.java
new file mode 100644
index 0000000000..9294cf740e
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionException.java
@@ -0,0 +1,42 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.management.configuration;
+
+/**
+ * Thrown when a connection to a broker cannot be estabilished.
+ *
+ * @author Andrea Gazzarini
+ */
+public class BrokerConnectionException extends Exception
+{
+ private static final long serialVersionUID = 8170112238862494025L;
+
+ /**
+ * Builds a new exception with the given cause.
+ *
+ * @param cause the exception cause.
+ */
+ BrokerConnectionException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
new file mode 100644
index 0000000000..ab90ec294b
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
@@ -0,0 +1,485 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.Map.Entry;
+
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.management.Names;
+import org.apache.qpid.management.domain.handler.base.IMessageHandler;
+import org.apache.qpid.management.domain.model.AccessMode;
+import org.apache.qpid.management.domain.model.type.AbsTime;
+import org.apache.qpid.management.domain.model.type.DeltaTime;
+import org.apache.qpid.management.domain.model.type.ObjectReference;
+import org.apache.qpid.management.domain.model.type.Str16;
+import org.apache.qpid.management.domain.model.type.Str8;
+import org.apache.qpid.management.domain.model.type.Type;
+import org.apache.qpid.management.domain.model.type.Uint16;
+import org.apache.qpid.management.domain.model.type.Uint32;
+import org.apache.qpid.management.domain.model.type.Uint64;
+import org.apache.qpid.management.domain.model.type.Uint8;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Qpid Management bridge configuration.
+ * Basically iy is a singleton that is holding all the configurtion data loaded at startup.
+ */
+public final class Configuration
+{
+ private final static Logger LOGGER = Logger.get(Configuration.class);
+ private static Configuration INSTANCE = new Configuration();
+
+ // Work Manager default settings
+ private int _poolSize = 5;
+ private int _maxPoolSize = 15;
+ private long _keepAliveTime = 5000;
+
+ Map<Integer, Type> _typeMappings = new HashMap<Integer,Type>();
+ Map<Integer,AccessMode> _accessModes = new HashMap<Integer, AccessMode>();
+ Map<Type,String> _validators = new HashMap<Type, String>();
+
+ Map<UUID,BrokerConnectionData> _brokerConnectionInfos = new HashMap<UUID, BrokerConnectionData>();
+
+ Map<Character, IMessageHandler> _managementQueueHandlers = new HashMap<Character, IMessageHandler>();
+ Map<Character, IMessageHandler> _methodReplyQueueHandlers = new HashMap<Character, IMessageHandler>();
+
+ private String _managementQueueName;
+ private String _methodReplyQueueName;
+
+ private Header _headerForCommandMessages;
+ private DeliveryProperties _deliveryProperties = new DeliveryProperties();
+ private MessageProperties _messageProperties = new MessageProperties();
+
+ // Private constructor.
+ private Configuration()
+ {
+ defineQueueNames();
+
+ createHeaderForCommandMessages();
+
+ addAccessModeMappings();
+
+ addTypeMappings();
+ }
+
+
+ /**
+ * Returns the singleton instance.
+ *
+ * @return the singleton instance.
+ */
+ public static Configuration getInstance ()
+ {
+ return INSTANCE;
+ }
+
+ /**
+ * Returns true if this configuration has at least
+ * one broker configured.
+ *
+ * @return true if this configuration has at least one
+ * broker configured.
+ */
+ public boolean hasOneOrMoreBrokersDefined()
+ {
+ return !_brokerConnectionInfos.isEmpty();
+ }
+
+ /**
+ * Returns the type associated to the given code.
+ *
+ * @param code the code used as search criteria.
+ * @return the type associated to the given code.
+ * @throws UnknownTypeCodeException when the given code is not associated to any type.
+ */
+ public Type getType(int code) throws UnknownTypeCodeException
+ {
+ Type result = _typeMappings.get(code);
+ if (result == null)
+ {
+ throw new UnknownTypeCodeException(code);
+ }
+ return result;
+ }
+
+ /**
+ * Returns the access mode associated to the given code.
+ *
+ * @param code the code used as search criteria.
+ * @return the access mode associated to the given code.
+ * @throws UnknownAccessCodeException when the given code is not associated to any access mode.
+ */
+ public AccessMode getAccessMode(int code) throws UnknownAccessCodeException
+ {
+ AccessMode result = _accessModes.get(code);
+ if (result == null)
+ {
+ throw new UnknownAccessCodeException(code);
+ }
+ return result;
+ }
+
+ /**
+ * Returns the validator class name associated to the given type.
+ *
+ * @param type the type.
+ * @return the validator class name associated to the given type.
+ */
+ public String getValidatorClassName (Type type)
+ {
+ return _validators.get(type);
+ }
+
+ /**
+ * Gets from this configuration the list of known broker (I mean, only their connection data).
+ *
+ * @return the list of known broker
+ */
+ public Set<Entry<UUID, BrokerConnectionData>> getConnectionInfos(){
+ return _brokerConnectionInfos.entrySet();
+ }
+
+ /**
+ * Gets from this configuration the connection data of the broker associated with the given id.
+ *
+ * @param brokerId the broker identifier.
+ * @return the connection data of the broker associated with the given id.
+ * @throws UnknownBrokerException when the given id is not associated with any broker.
+ */
+ public BrokerConnectionData getBrokerConnectionData (UUID brokerId) throws UnknownBrokerException
+ {
+ BrokerConnectionData connectionData = _brokerConnectionInfos.get(brokerId);
+ if (connectionData == null)
+ {
+ throw new UnknownBrokerException(brokerId);
+ }
+ return _brokerConnectionInfos.get(brokerId);
+ }
+
+ /**
+ * Returns the name of the management queue.
+ *
+ * @return the name of the management queue.
+ */
+ public String getManagementQueueName() {
+ return _managementQueueName;
+ }
+
+ /**
+ * Returns the name of the method-reply queue.
+ *
+ * @return the name of the method-reply queue.
+ */
+ public String getMethodReplyQueueName() {
+ return _methodReplyQueueName;
+ }
+
+ /**
+ * Returns a map containing all the configured management message handlers.
+ * A management message handler it is a basically a processor for a management queue incoming message associated
+ * with a specific opcode.
+ *
+ * @return a map containing all the configured management message handlers.
+ */
+ public Map<Character, IMessageHandler> getManagementQueueHandlers()
+ {
+ return _managementQueueHandlers;
+ }
+
+ /**
+ * Returns a map containing all the configured method-reply message handlers.
+ * A management message handler it is a basically a processor for a method-reply queue incoming message associated
+ * with a specific opcode.
+ *
+ * @return a map containing all the configured method-reply message handlers.
+ */
+ public Map<Character, IMessageHandler> getMethodReplyQueueHandlers()
+ {
+ return _methodReplyQueueHandlers;
+ }
+
+ /**
+ * Returns the message header used for sending command message on management queue.
+ *
+ * @return the message header used for sending command message on management queue.
+ */
+ public Header getCommandMessageHeader ()
+ {
+ return _headerForCommandMessages;
+ }
+
+ /**
+ * Returns the command message properties.
+ *
+ * @return the command message properties.
+ */
+ public MessageProperties getCommandMessageProperties ()
+ {
+ return _messageProperties;
+ }
+
+ /**
+ * Returns the command message delivery properties.
+ *
+ * @return the command message delivery properties.
+ */
+ public DeliveryProperties getCommandDeliveryProperties ()
+ {
+ return _deliveryProperties;
+ }
+
+ /**
+ * Adds a new type mapping to this configuration.
+ *
+ * @param code the code that will be associated with the declared type.
+ * @param type the type.
+ * @param vailidatorClassName the FQN of the validator class that will be
+ * associated with the given type.
+ */
+ void addTypeMapping(int code, Type type, String validatorClassName) {
+ _typeMappings.put(code, type);
+ _validators.put(type, validatorClassName);
+
+ LOGGER.info(
+ Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED,
+ code,
+ type,
+ validatorClassName);
+ }
+
+
+ /**
+ * Adds a new type mapping to this configuration.
+ *
+ * @param code the code that will be associated with the declared type.
+ * @param type the type.
+ */
+ void addTypeMapping(int code, Type type) {
+ _typeMappings.put(code, type);
+
+ LOGGER.info(
+ Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED,
+ code,
+ type,
+ "not configured for this type.");
+ }
+
+ /**
+ * Adds a new access mode mapping to this configuration.
+ *
+ * @param code the code that will be associated with the access mode,
+ * @param accessMode the accessMode.
+ */
+ void addAccessModeMapping(int code, AccessMode accessMode){
+ _accessModes.put(code, accessMode);
+
+ LOGGER.info(Messages.QMAN_000006_ACCESS_MODE_MAPPING_CONFIGURED, code,accessMode);
+ }
+
+ /**
+ * Adds a new management message handler to this configuration.
+ * The incoming mapping object will contains an opcode and the class (as a string) of the message handler that will be used
+ * for processing incoming messages with that opcode.
+ *
+ * @param mapping the message handler mapping.
+ */
+ void addManagementMessageHandlerMapping (MessageHandlerMapping mapping)
+ {
+ Character opcode = mapping.getOpcode();
+ IMessageHandler handler = mapping.getMessageHandler();
+ _managementQueueHandlers.put(opcode, handler);
+
+ LOGGER.info(Messages.QMAN_000007_MANAGEMENT_HANDLER_MAPPING_CONFIGURED, opcode,handler.getClass().getName());
+ }
+
+ /**
+ * Adds a new method-reply message handler to this configuration.
+ * The incoming mapping object will contains an opcode and the class (as a string) of the message handler that will be used
+ * for processing incoming messages with that opcode.
+ *
+ * @param mapping the message handler mapping.
+ */
+ void addMethodReplyMessageHandlerMapping (MessageHandlerMapping mapping)
+ {
+ Character opcode = mapping.getOpcode();
+ IMessageHandler handler = mapping.getMessageHandler();
+ _methodReplyQueueHandlers.put(opcode, handler);
+
+ LOGGER.info(Messages.QMAN_000008_METHOD_REPLY_HANDLER_MAPPING_CONFIGURED, opcode,handler.getClass().getName());
+ }
+
+ /**
+ * Adds to this configuration a new broker connection data.
+ *
+ * @param brokerId the broker identifier.
+ * @param connectionData the connection data.
+ * @throws BrokerAlreadyConnectedException when the broker is already connected.
+ * @throws BrokerConnectionException when a connection cannot be estabilished.
+ */
+ void addBrokerConnectionData (UUID brokerId, BrokerConnectionData connectionData) throws BrokerAlreadyConnectedException, BrokerConnectionException
+ {
+ if (_brokerConnectionInfos.containsValue(connectionData))
+ {
+ throw new BrokerAlreadyConnectedException(connectionData);
+ }
+
+ try
+ {
+ QpidDatasource.getInstance().addConnectionPool(brokerId, connectionData);
+ _brokerConnectionInfos.put(brokerId,connectionData);
+
+ LOGGER.info(Messages.QMAN_000009_BROKER_DATA_CONFIGURED,brokerId,connectionData);
+ } catch(Exception exception)
+ {
+ throw new BrokerConnectionException(exception);
+ }
+
+ }
+
+ /**
+ * Header for command messages is created once because it only contains static values.
+ */
+ private void createHeaderForCommandMessages ()
+ {
+ ReplyTo replyTo=new ReplyTo();
+ replyTo.setRoutingKey(_methodReplyQueueName);
+ _messageProperties.setReplyTo(replyTo);
+ _deliveryProperties.setRoutingKey(Names.AGENT_ROUTING_KEY);
+ _headerForCommandMessages = new Header(_deliveryProperties, _messageProperties);
+ }
+
+ /**
+ * Creates the name of the queues used by this service.
+ * This is done because if a broker should be managed by one or more management client, then each of them
+ * must have its own channels to communicate with.
+ */
+ private void defineQueueNames()
+ {
+ UUID uuid = UUID.randomUUID();
+ _managementQueueName = Names.MANAGEMENT_QUEUE_PREFIX+uuid;
+ _methodReplyQueueName = Names.METHOD_REPLY_QUEUE_PREFIX+uuid;
+
+ LOGGER.debug(Messages.QMAN_200004_MANAGEMENT_QUEUE_NAME,_managementQueueName);
+ LOGGER.debug(Messages.QMAN_200005_METHOD_REPLY_QUEUE_NAME,_methodReplyQueueName);
+ }
+
+ /**
+ * Returns the worker manager thread pool size.
+ *
+ * @return the worker manager thread pool size.
+ */
+ public int getWorkerManagerPoolSize()
+ {
+ return _poolSize;
+ }
+
+ /**
+ * Sets the size of the worker manager thread pool.
+ *
+ * @param poolSize the size of the worker manager thread pool.
+ */
+ void setWorkerManagerPoolSize(int poolSize)
+ {
+ this._poolSize = poolSize;
+ }
+
+ /**
+ * Returns the maximum size of the worker manager
+ * thread pool size.
+ *
+ * @return the max size of the worker manager thread pool.
+ */
+ public int getWorkerManagerMaxPoolSize()
+ {
+ return _maxPoolSize;
+ }
+
+ /**
+ * Sets the maximum size of the worker manager
+ * thread pool size.
+ *
+ * @param maxPoolSize the max size of the worker manager thread pool.
+ */
+ void setWorkerManagerMaxPoolSize(int maxPoolSize)
+ {
+ this._maxPoolSize = maxPoolSize;
+ }
+
+ /**
+ * Returns the max amount of time that an excess thread
+ * can be idle before purging from the pool.
+ *
+ * @return the max keep alive time.
+ */
+ public long getWorkerManagerKeepAliveTime()
+ {
+ return _keepAliveTime;
+ }
+
+ /**
+ * Sets the max amount of time that an excess thread
+ * can be idle before purging from the pool.
+ *
+ * @param keepAliveTime the max keep alive time.
+ */
+ void setWorkerManagerKeepAliveTime(long keepAliveTime)
+ {
+ this._keepAliveTime = keepAliveTime;
+ }
+
+ /**
+ * Configures access mode mappings.
+ * An access mode mapping is an association between a code and an access mode.
+ */
+ private void addAccessModeMappings() {
+ addAccessModeMapping(1,AccessMode.RC);
+ addAccessModeMapping(2,AccessMode.RW);
+ addAccessModeMapping(3,AccessMode.RO);
+ }
+
+ /**
+ * Configures type mappings.
+ * A type mapping is an association between a code and a management type.
+ */
+ private void addTypeMappings()
+ {
+ addTypeMapping(1,new Uint8(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(2,new Uint16(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(3,new Uint32(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(4,new Uint64(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(6,new Str8(),Names.STRING_VALIDATOR);
+ addTypeMapping(7,new Str16(),Names.STRING_VALIDATOR);
+ addTypeMapping(8,new AbsTime());
+ addTypeMapping(9,new DeltaTime());
+ addTypeMapping(10,new ObjectReference());
+ addTypeMapping(11,new org.apache.qpid.management.domain.model.type.Boolean());
+ addTypeMapping(14,new org.apache.qpid.management.domain.model.type.Uuid());
+ addTypeMapping(15,new org.apache.qpid.management.domain.model.type.Map());
+ }
+} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ConfigurationException.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ConfigurationException.java
new file mode 100644
index 0000000000..6eed515e11
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ConfigurationException.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+/**
+ * Thrown when a problem is encountered during building the configuration.
+ *
+ * @author Andrea Gazzarini
+ */
+public class ConfigurationException extends Exception
+{
+ private static final long serialVersionUID = 8238481177714286259L;
+
+ public ConfigurationException(String msg)
+ {
+ super(msg);
+ }
+
+ /**
+ * Builds a new ConfigurationException with the given cause.
+ *
+ * @param exception the exception cause.
+ */
+ public ConfigurationException(Exception exception)
+ {
+ super(exception);
+ }
+
+ public ConfigurationException(String msg,Exception exception)
+ {
+ super(msg,exception);
+ }
+} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
new file mode 100644
index 0000000000..fe44c6aff7
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.UUID;
+
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.management.Names;
+import org.apache.qpid.management.Protocol;
+import org.apache.qpid.management.domain.handler.impl.ConfigurationMessageHandler;
+import org.apache.qpid.management.domain.handler.impl.EventContentMessageHandler;
+import org.apache.qpid.management.domain.handler.impl.HeartBeatIndicationMessageHandler;
+import org.apache.qpid.management.domain.handler.impl.InstrumentationMessageHandler;
+import org.apache.qpid.management.domain.handler.impl.MethodResponseMessageHandler;
+import org.apache.qpid.management.domain.handler.impl.SchemaResponseMessageHandler;
+import org.apache.qpid.transport.util.Logger;
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+/**
+ * Director used for coordinating the build process of configuration.
+ * This is the only component which has a read-write permission on Configuration object.
+ */
+public class Configurator extends DefaultHandler
+{
+ private final static Logger LOGGER = Logger.get(Configurator.class);
+
+ /**
+ * Default (empty) parser used when there's no need to process data (non relevant elements).
+ */
+ final static IParser DEFAULT_PARSER = new IParser() {
+
+ public void setCurrrentAttributeValue (String value)
+ {
+ }
+
+ public void setCurrentAttributeName (String name)
+ {
+ }
+ };
+
+ IParser _brokerConfigurationParser = new BrokerConnectionDataParser();
+ IParser _workerManagerConfigurationParser = new WorkerManagerConfigurationParser();
+ IParser _currentParser = DEFAULT_PARSER;
+
+ /**
+ * Delegates the processing to the current parser.
+ */
+ @Override
+ public void characters (char[] ch, int start, int length) throws SAXException
+ {
+ String value = new String(ch,start,length).trim();
+ if (value.length() != 0) {
+ _currentParser.setCurrrentAttributeValue(value);
+ }
+ }
+
+ /**
+ * Here is defined what parser needs to be used for processing the current data.
+ */
+ @Override
+ public void startElement (String uri, String localName, String name, Attributes attributes) throws SAXException
+ {
+ switch(Tag.get(name))
+ {
+ case BROKERS :
+ {
+ _currentParser = _brokerConfigurationParser;
+ break;
+ }
+ case WORK_MANAGER :
+ {
+ _currentParser = _workerManagerConfigurationParser;
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void endElement (String uri, String localName, String name) throws SAXException
+ {
+ _currentParser.setCurrentAttributeName(name);
+ }
+
+ /**
+ * Builds whole configuration.
+ *
+ * @throws ConfigurationException when the build fails.
+ */
+ public void configure() throws ConfigurationException
+ {
+ BufferedReader reader = null;
+ try
+ {
+ String initialConfigFileName = System.getProperty(Names.QMAN_CONFIG_OPTION_NAME);
+ if (initialConfigFileName != null && initialConfigFileName.trim().length() != 0)
+ {
+ File initialConfigurationFile = new File(initialConfigFileName);
+ if (initialConfigurationFile.canRead())
+ {
+ SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
+ reader = new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(initialConfigFileName)));
+ InputSource source = new InputSource(reader);
+ parser.parse(source, this);
+ } else {
+ LOGGER.warn(
+ Messages.QMAN_300004_INVALID_CONFIGURATION_FILE,
+ initialConfigFileName);
+ }
+ }
+
+ addMandatoryManagementMessageHandlers();
+ addMandatoryMethodReplyMessageHandlers();
+ } catch (Exception exception)
+ {
+ throw new ConfigurationException(exception);
+ } finally
+ {
+ try
+ {
+ reader.close();
+ } catch (Exception ignore)
+ {
+ }
+ }
+ }
+
+ /**
+ * Creates and return a value object (BrokerConnectionData) with the given parameters.
+ * Note that that object will be stored on configuration and it could be used to set a connection with the broker.
+ * This happens when the "initialPoolCapacity" is greater than 0 : in this case the caller is indicatinf that it wants to open
+ * one or more connections immediately at startup and therefore Q-Man will try to do that.
+ *
+ * @param host the hostname where the broker is running.
+ * @param port the port where the broker is running.
+ * @param username the username for connecting with the broker.
+ * @param password the password for connecting with the broker.
+ * @param virtualHost the virtual host.
+ * @param initialPoolCapacity the number of the connection that must be immediately opened.
+ * @param maxPoolCapacity the maximum number of opened connection.
+ * @param maxWaitTimeout the maximum amount of time that a client will wait for obtaining a connection.
+ * @return the value object containing the data above.
+ * @throws BrokerAlreadyConnectedException when the broker is already connected.
+ * @throws BrokerConnectionException when a connection cannot be estabilished.
+ */
+ public BrokerConnectionData createAndReturnBrokerConnectionData(
+ UUID brokerId,
+ String host,
+ int port,
+ String username,
+ String password,
+ String virtualHost,
+ int initialPoolCapacity,
+ int maxPoolCapacity,
+ long maxWaitTimeout) throws BrokerAlreadyConnectedException, BrokerConnectionException
+ {
+ BrokerConnectionData data = new BrokerConnectionData(
+ host,
+ port,
+ virtualHost,
+ username,
+ password,
+ initialPoolCapacity,
+ maxPoolCapacity,
+ maxWaitTimeout);
+ Configuration.getInstance().addBrokerConnectionData(brokerId, data);
+ return data;
+ }
+
+ /**
+ * Configures the mandatory management message handlers.
+ */
+ void addMandatoryMethodReplyMessageHandlers ()
+ {
+ Configuration.getInstance().addMethodReplyMessageHandlerMapping(
+ new MessageHandlerMapping(
+ Protocol.OPERATION_INVOCATION_RESPONSE_OPCODE,
+ new MethodResponseMessageHandler()));
+
+ Configuration.getInstance().addMethodReplyMessageHandlerMapping(
+ new MessageHandlerMapping(
+ Protocol.SCHEMA_RESPONSE_OPCODE,
+ new SchemaResponseMessageHandler()));
+ }
+
+ /**
+ * Configures the mandatory management message handlers.
+ */
+ void addMandatoryManagementMessageHandlers ()
+ {
+ Configuration.getInstance().addManagementMessageHandlerMapping(
+ new MessageHandlerMapping(
+ Protocol.INSTRUMENTATION_CONTENT_RESPONSE_OPCODE,
+ new InstrumentationMessageHandler()));
+
+ Configuration.getInstance().addManagementMessageHandlerMapping(
+ new MessageHandlerMapping(
+ Protocol.CONFIGURATION_CONTENT_RESPONSE_OPCDE,
+ new ConfigurationMessageHandler()));
+
+ Configuration.getInstance().addManagementMessageHandlerMapping(
+ new MessageHandlerMapping(
+ Protocol.EVENT_CONTENT_RESPONSE_OPCDE,
+ new EventContentMessageHandler()));
+
+ Configuration.getInstance().addManagementMessageHandlerMapping(
+ new MessageHandlerMapping(
+ Protocol.HEARTBEAT_INDICATION_RESPONSE_OPCODE,
+ new HeartBeatIndicationMessageHandler()));
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/IParser.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/IParser.java
new file mode 100644
index 0000000000..a221686765
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/IParser.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+/**
+ * Interface definition for configuration parser
+ * Concrete implementors are responsible for parsing a specific XML part of configuration data.
+ *
+ * @author Andrea Gazzarini
+ */
+interface IParser
+{
+ /**
+ * Main director callback : Sets the name of the current attribute.
+ *
+ * @param name the name of the current attribute.
+ */
+ void setCurrentAttributeName(String name);
+
+ /**
+ * Main director callback : sets the value of the current attribute.
+ *
+ * @param value the value of the current attribute.
+ */
+ void setCurrrentAttributeValue(String value);
+} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java
new file mode 100644
index 0000000000..b02fb789cc
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import org.apache.qpid.management.domain.handler.base.IMessageHandler;
+
+/**
+ * Message Handler mapping used for associating an opcode with a message handler.
+ */
+class MessageHandlerMapping
+{
+ private final Character _opcode;
+ private final IMessageHandler _handler;
+
+ /**
+ * Builds a new mapping with the given opcode and handler class.
+ *
+ * @param opcode the opcode.
+ * @param handlerClass the handler class.
+ */
+ MessageHandlerMapping(Character opcode, IMessageHandler handler)
+ {
+ this._opcode = opcode;
+ this._handler = handler;
+ }
+
+ /**
+ * Returns the opcode of this mapping.
+ *
+ * @return the code of this mapping.
+ */
+ Character getOpcode ()
+ {
+ return _opcode;
+ }
+
+ /**
+ * Returns the message handler for this mapping.
+ *
+ * @return the message handler for this mapping.
+ */
+ IMessageHandler getMessageHandler()
+ {
+ return _handler;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
new file mode 100644
index 0000000000..569a65a782
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPoolFactory;
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Qpid datasource.
+ * Basically it is a connection pool manager used for optimizing broker connections usage.
+ *
+ * @author Andrea Gazzarini
+ */
+public final class QpidDatasource
+{
+ private final static Logger LOGGER = Logger.get(QpidDatasource.class);
+
+ /**
+ * A connection decorator used for adding pool interaction behaviour to an existing connection.
+ *
+ * @author Andrea Gazzarini
+ */
+ class PooledConnection extends Connection
+ {
+ private final UUID _brokerId;
+ private boolean _valid;
+
+ /**
+ * Builds a new decorator with the given connection.
+ *
+ * @param brokerId the broker identifier.
+ */
+ private PooledConnection(UUID brokerId)
+ {
+ this._brokerId = brokerId;
+ _valid = true;
+ }
+
+ /**
+ * Returns true if the underlying connection is still valid and can be used.
+ *
+ * @return true if the underlying connection is still valid and can be used.
+ */
+ boolean isValid()
+ {
+ return _valid;
+ }
+
+ void reallyClose()
+ {
+ super.close();
+ }
+
+ /**
+ * Returns the connection to the pool. That is, marks this connections as available.
+ * After that, this connection will be available for further operations.
+ */
+ public void close()
+ {
+ try
+ {
+ pools.get(_brokerId).returnObject(this);
+
+ LOGGER.debug(Messages.QMAN_200006_QPID_CONNECTION_RELEASED, this);
+ }
+ catch (Exception e)
+ {
+ throw new ConnectionException(e);
+ }
+ }
+
+ public void exception(Throwable t)
+ {
+ //super.exception(t);
+ _valid = false;
+ }
+ }
+
+ /**
+ * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
+ * the broker connection(s).
+ *
+ * @author Andrea Gazzarini
+ */
+ class QpidConnectionFactory extends BasePoolableObjectFactory
+ {
+ private final BrokerConnectionData _connectionData;
+ private final UUID _brokerId;
+
+ /**
+ * Builds a new connection factory with the given parameters.
+ *
+ * @param brokerId the broker identifier.
+ * @param connectionData the connecton data.
+ */
+ private QpidConnectionFactory(UUID brokerId, BrokerConnectionData connectionData)
+ {
+ this._connectionData = connectionData;
+ this._brokerId = brokerId;
+ }
+
+ /**
+ * Creates a new underlying connection.
+ */
+ @Override
+ public Connection makeObject () throws Exception
+ {
+ PooledConnection connection = new PooledConnection(_brokerId);
+ connection.connect(
+ _connectionData.getHost(),
+ _connectionData.getPort(),
+ _connectionData.getVirtualHost(),
+ _connectionData.getUsername(),
+ _connectionData.getPassword(),
+ false);
+ return connection;
+ }
+
+ /**
+ * Validates the underlying connection.
+ */
+ @Override
+ public boolean validateObject (Object obj)
+ {
+ PooledConnection connection = (PooledConnection) obj;
+ boolean isValid = connection.isValid();
+
+ LOGGER.debug(Messages.QMAN_200007_TEST_CONNECTION_ON_RESERVE,isValid);
+
+ return isValid;
+ }
+
+ /**
+ * Closes the underlying connection.
+ */
+ @Override
+ public void destroyObject (Object obj) throws Exception
+ {
+ try
+ {
+ PooledConnection connection = (PooledConnection) obj;
+ connection.reallyClose();
+
+ LOGGER.debug(Messages.QMAN_200008_CONNECTION_DESTROYED);
+ } catch (Exception exception)
+ {
+ LOGGER.debug(exception, Messages.QMAN_200009_CONNECTION_DESTROY_FAILURE);
+ }
+ }
+ }
+
+ // Singleton instance.
+ private static QpidDatasource instance = new QpidDatasource();
+
+ // Each entry contains a connection pool for a specific broker.
+ private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
+
+ // Private constructor.
+ private QpidDatasource()
+ {
+ }
+
+ /**
+ * Gets an available connection from the pool of the given broker.
+ *
+ * @param brokerId the broker identifier.
+ * @return a valid connection to the broker associated with the given identifier.
+ */
+ public Connection getConnection(UUID brokerId) throws Exception
+ {
+ return (Connection) pools.get(brokerId).borrowObject();
+ }
+
+ /**
+ * Entry point method for retrieving the singleton instance of this datasource.
+ *
+ * @return the qpid datasource singleton instance.
+ */
+ public static QpidDatasource getInstance()
+ {
+ return instance;
+ }
+
+ /**
+ * Adds a connection pool to this datasource.
+ *
+ * @param brokerId the broker identifier that will be associated with the new connection pool.
+ * @param connectionData the broker connection data.
+ * @throws Exception when the pool cannot be created.
+ */
+ void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) throws Exception
+ {
+ GenericObjectPoolFactory factory = new GenericObjectPoolFactory(
+ new QpidConnectionFactory(brokerId,connectionData),
+ connectionData.getMaxPoolCapacity(),
+ GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
+ connectionData.getMaxWaitTimeout(),-1,
+ true,
+ false);
+
+ ObjectPool pool = factory.createPool();
+
+ // Open connections at startup according to initial capacity param value.
+ int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity();
+ Object [] openStartupList = new Object[howManyConnectionAtStartup];
+
+ // Open...
+ for (int index = 0; index < howManyConnectionAtStartup; index++)
+ {
+ openStartupList[index] = pool.borrowObject();
+ }
+
+ // ...and immediately return them to pool. In this way the pooled connection has been opened.
+ for (int index = 0; index < howManyConnectionAtStartup; index++)
+ {
+ pool.returnObject(openStartupList[index]);
+ }
+
+ pools.put(brokerId,pool);
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
new file mode 100644
index 0000000000..c2b6e1e27d
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
@@ -0,0 +1,54 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.management.configuration;
+
+/**
+ * Configuration Tag catalogue.
+ *
+ * @author Andrea Gazzarini
+ */
+public enum Tag {
+ CONFIGURATION { @Override public String toString() { return "configuration"; }},
+ BROKER { @Override public String toString() { return "broker"; }},
+ HOST { @Override public String toString() { return "host"; }},
+ PORT { @Override public String toString() { return "port"; }},
+ MAX_POOL_CAPACITY { @Override public String toString() { return "max-pool-capacity"; }},
+ MAX_WAIT_TIMEOUT { @Override public String toString() { return "max-wait-timeout"; }},
+ INITIAL_POOL_CAPACITY { @Override public String toString() { return "initial-pool-capacity"; }},
+ VIRTUAL_HOST { @Override public String toString() { return "virtual-host"; }},
+ USER { @Override public String toString() { return "user"; }},
+ PASSWORD { @Override public String toString() { return "password"; }},
+ BROKERS { @Override public String toString() { return "brokers"; }},
+ WORK_MANAGER { @Override public String toString() { return "work-manager"; }},
+ POOL_CAPACITY { @Override public String toString() { return "pool-capacity"; }},
+ KEEP_ALIVE_TIME { @Override public String toString() { return "keep-alive-time"; }};
+
+ /**
+ * Returns the enum entry associated to the given tag name.
+ *
+ * @param name the name of tag.
+ * @return the enum entry associated to the given tag name.
+ */
+ public static Tag get(String name) {
+ return valueOf(name.replaceAll("-", "_").toUpperCase());
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownAccessCodeException.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownAccessCodeException.java
new file mode 100644
index 0000000000..b7f1c0a7ec
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownAccessCodeException.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+/**
+ * Thrown when no access mode is found in configuration associated to the given code.
+ *
+ * @author Andrea Gazzarini
+ */
+public class UnknownAccessCodeException extends Exception
+{
+ private static final long serialVersionUID = 2350963503092509119L;
+ private final int _code;
+
+ /**
+ * Builds a new UnknownAccessCodeException with the given code.
+ *
+ * @param code the access code.
+ */
+ UnknownAccessCodeException(int code)
+ {
+ super(String.valueOf(code));
+ this._code = code;
+ }
+
+ /**
+ * Returns the unknown code.
+ *
+ * @return the unknown code.
+ */
+ public int getCode ()
+ {
+ return _code;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownBrokerException.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownBrokerException.java
new file mode 100644
index 0000000000..5b08e09c24
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownBrokerException.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.UUID;
+
+/**
+ * Thrown when someone requests connection data for an unknown broker.
+ *
+ * @author Andrea Gazzarini
+ */
+public class UnknownBrokerException extends Exception
+{
+ private static final long serialVersionUID = 4965395428832158924L;
+
+ /**
+ * Builds a new UnknownBrokerException with the given broker id.
+ *
+ * @param brokerId the broker identifier.
+ */
+ UnknownBrokerException(UUID brokerId)
+ {
+ super(String.valueOf(brokerId));
+ }
+} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownTypeCodeException.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownTypeCodeException.java
new file mode 100644
index 0000000000..57005d21e5
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/UnknownTypeCodeException.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+/**
+ * Thrown when no type is found in configuration associated to the given code.
+ *
+ * @author Andrea Gazzarini
+ */
+public class UnknownTypeCodeException extends Exception
+{
+ private static final long serialVersionUID = 5440934037645111591L;
+ private int _code;
+
+ /**
+ * Builds a new UnknownTypeCodeException with the given code.
+ *
+ * @param code the access code.
+ */
+ UnknownTypeCodeException(int code)
+ {
+ super(String.valueOf(code));
+ this._code = code;
+ }
+
+ /**
+ * Returns the unknown code.
+ *
+ * @return the unknown code.
+ */
+ public int getCode ()
+ {
+ return _code;
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
new file mode 100644
index 0000000000..4e68b54b12
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.UUID;
+
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Parser used for building worker manager settings.
+ * The corresponding section of the configuration file is :
+ *
+ <work-manager>
+ <pool-capacity>5</pool-capacity>
+ <max-pool-capacity>15</max-pool-capacity>
+ <keep-alive-time>5000</keep-alive-time>
+ </work-manager>
+
+ *
+ * @author Andrea Gazzarini
+ */
+class WorkerManagerConfigurationParser implements IParser
+{
+ private final static Logger LOGGER = Logger.get(Configuration.class);
+ private String _currentValue;
+
+ private String _poolSizeAsString;
+ private String _maxPoolSizeAsString;
+ private String _keepAliveTimeAsString;
+
+ /**
+ * Callback : the given value is the text content of the current node.
+ */
+ public void setCurrrentAttributeValue (String value)
+ {
+ this._currentValue = value;
+ }
+
+ /**
+ * Callback: each time the end of an element is reached
+ * this method is called.
+ */
+ public void setCurrentAttributeName (String name)
+ {
+ switch (Tag.get(name))
+ {
+ case POOL_CAPACITY:
+ {
+ _poolSizeAsString = _currentValue.trim();
+ break;
+ }
+ case MAX_POOL_CAPACITY :
+ {
+ _maxPoolSizeAsString = _currentValue;
+ break;
+ }
+ case KEEP_ALIVE_TIME:
+ {
+ _keepAliveTimeAsString = _currentValue;
+ break;
+ }
+ case WORK_MANAGER:
+ {
+ Configuration configuration = Configuration.getInstance();
+ try
+ {
+ configuration.setWorkerManagerPoolSize(Integer.parseInt(_poolSizeAsString));
+ configuration.setWorkerManagerMaxPoolSize(Integer.parseInt(_maxPoolSizeAsString));
+ configuration.setWorkerManagerKeepAliveTime(Long.parseLong(_keepAliveTimeAsString));
+ } catch(Exception exception)
+ {
+ LOGGER.error(Messages.QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER);
+ } finally {
+ LOGGER.info(Messages.QMAN_000035_WORK_MANAGER_POOL_SIZE,configuration.getWorkerManagerPoolSize());
+ LOGGER.info(Messages.QMAN_000036_WORK_MANAGER_MAX_POOL_SIZE,configuration.getWorkerManagerMaxPoolSize());
+ LOGGER.info(Messages.QMAN_000037_WORK_MANAGER_KEEP_ALIVE_TIME,configuration.getWorkerManagerKeepAliveTime());
+ }
+ break;
+ }
+ }
+ }
+
+ /**
+ * Gets an uuid in order to associate current connection data with a broker.
+ * @return
+ */
+ UUID getUUId(){
+ return UUID.randomUUID();
+ }
+}