diff options
Diffstat (limited to 'java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java')
-rw-r--r-- | java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java new file mode 100644 index 0000000000..569a65a782 --- /dev/null +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.configuration; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.commons.pool.ObjectPool; +import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.commons.pool.impl.GenericObjectPoolFactory; +import org.apache.qpid.management.Messages; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionException; +import org.apache.qpid.transport.util.Logger; + +/** + * Qpid datasource. + * Basically it is a connection pool manager used for optimizing broker connections usage. + * + * @author Andrea Gazzarini + */ +public final class QpidDatasource +{ + private final static Logger LOGGER = Logger.get(QpidDatasource.class); + + /** + * A connection decorator used for adding pool interaction behaviour to an existing connection. + * + * @author Andrea Gazzarini + */ + class PooledConnection extends Connection + { + private final UUID _brokerId; + private boolean _valid; + + /** + * Builds a new decorator with the given connection. + * + * @param brokerId the broker identifier. + */ + private PooledConnection(UUID brokerId) + { + this._brokerId = brokerId; + _valid = true; + } + + /** + * Returns true if the underlying connection is still valid and can be used. + * + * @return true if the underlying connection is still valid and can be used. + */ + boolean isValid() + { + return _valid; + } + + void reallyClose() + { + super.close(); + } + + /** + * Returns the connection to the pool. That is, marks this connections as available. + * After that, this connection will be available for further operations. + */ + public void close() + { + try + { + pools.get(_brokerId).returnObject(this); + + LOGGER.debug(Messages.QMAN_200006_QPID_CONNECTION_RELEASED, this); + } + catch (Exception e) + { + throw new ConnectionException(e); + } + } + + public void exception(Throwable t) + { + //super.exception(t); + _valid = false; + } + } + + /** + * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of + * the broker connection(s). + * + * @author Andrea Gazzarini + */ + class QpidConnectionFactory extends BasePoolableObjectFactory + { + private final BrokerConnectionData _connectionData; + private final UUID _brokerId; + + /** + * Builds a new connection factory with the given parameters. + * + * @param brokerId the broker identifier. + * @param connectionData the connecton data. + */ + private QpidConnectionFactory(UUID brokerId, BrokerConnectionData connectionData) + { + this._connectionData = connectionData; + this._brokerId = brokerId; + } + + /** + * Creates a new underlying connection. + */ + @Override + public Connection makeObject () throws Exception + { + PooledConnection connection = new PooledConnection(_brokerId); + connection.connect( + _connectionData.getHost(), + _connectionData.getPort(), + _connectionData.getVirtualHost(), + _connectionData.getUsername(), + _connectionData.getPassword(), + false); + return connection; + } + + /** + * Validates the underlying connection. + */ + @Override + public boolean validateObject (Object obj) + { + PooledConnection connection = (PooledConnection) obj; + boolean isValid = connection.isValid(); + + LOGGER.debug(Messages.QMAN_200007_TEST_CONNECTION_ON_RESERVE,isValid); + + return isValid; + } + + /** + * Closes the underlying connection. + */ + @Override + public void destroyObject (Object obj) throws Exception + { + try + { + PooledConnection connection = (PooledConnection) obj; + connection.reallyClose(); + + LOGGER.debug(Messages.QMAN_200008_CONNECTION_DESTROYED); + } catch (Exception exception) + { + LOGGER.debug(exception, Messages.QMAN_200009_CONNECTION_DESTROY_FAILURE); + } + } + } + + // Singleton instance. + private static QpidDatasource instance = new QpidDatasource(); + + // Each entry contains a connection pool for a specific broker. + private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>(); + + // Private constructor. + private QpidDatasource() + { + } + + /** + * Gets an available connection from the pool of the given broker. + * + * @param brokerId the broker identifier. + * @return a valid connection to the broker associated with the given identifier. + */ + public Connection getConnection(UUID brokerId) throws Exception + { + return (Connection) pools.get(brokerId).borrowObject(); + } + + /** + * Entry point method for retrieving the singleton instance of this datasource. + * + * @return the qpid datasource singleton instance. + */ + public static QpidDatasource getInstance() + { + return instance; + } + + /** + * Adds a connection pool to this datasource. + * + * @param brokerId the broker identifier that will be associated with the new connection pool. + * @param connectionData the broker connection data. + * @throws Exception when the pool cannot be created. + */ + void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) throws Exception + { + GenericObjectPoolFactory factory = new GenericObjectPoolFactory( + new QpidConnectionFactory(brokerId,connectionData), + connectionData.getMaxPoolCapacity(), + GenericObjectPool.WHEN_EXHAUSTED_BLOCK, + connectionData.getMaxWaitTimeout(),-1, + true, + false); + + ObjectPool pool = factory.createPool(); + + // Open connections at startup according to initial capacity param value. + int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity(); + Object [] openStartupList = new Object[howManyConnectionAtStartup]; + + // Open... + for (int index = 0; index < howManyConnectionAtStartup; index++) + { + openStartupList[index] = pool.borrowObject(); + } + + // ...and immediately return them to pool. In this way the pooled connection has been opened. + for (int index = 0; index < howManyConnectionAtStartup; index++) + { + pool.returnObject(openStartupList[index]); + } + + pools.put(brokerId,pool); + } +} |