summaryrefslogtreecommitdiff
path: root/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java')
-rw-r--r--java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java249
1 files changed, 249 insertions, 0 deletions
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
new file mode 100644
index 0000000000..569a65a782
--- /dev/null
+++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPoolFactory;
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Qpid datasource.
+ * Basically it is a connection pool manager used for optimizing broker connections usage.
+ *
+ * @author Andrea Gazzarini
+ */
+public final class QpidDatasource
+{
+ private final static Logger LOGGER = Logger.get(QpidDatasource.class);
+
+ /**
+ * A connection decorator used for adding pool interaction behaviour to an existing connection.
+ *
+ * @author Andrea Gazzarini
+ */
+ class PooledConnection extends Connection
+ {
+ private final UUID _brokerId;
+ private boolean _valid;
+
+ /**
+ * Builds a new decorator with the given connection.
+ *
+ * @param brokerId the broker identifier.
+ */
+ private PooledConnection(UUID brokerId)
+ {
+ this._brokerId = brokerId;
+ _valid = true;
+ }
+
+ /**
+ * Returns true if the underlying connection is still valid and can be used.
+ *
+ * @return true if the underlying connection is still valid and can be used.
+ */
+ boolean isValid()
+ {
+ return _valid;
+ }
+
+ void reallyClose()
+ {
+ super.close();
+ }
+
+ /**
+ * Returns the connection to the pool. That is, marks this connections as available.
+ * After that, this connection will be available for further operations.
+ */
+ public void close()
+ {
+ try
+ {
+ pools.get(_brokerId).returnObject(this);
+
+ LOGGER.debug(Messages.QMAN_200006_QPID_CONNECTION_RELEASED, this);
+ }
+ catch (Exception e)
+ {
+ throw new ConnectionException(e);
+ }
+ }
+
+ public void exception(Throwable t)
+ {
+ //super.exception(t);
+ _valid = false;
+ }
+ }
+
+ /**
+ * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
+ * the broker connection(s).
+ *
+ * @author Andrea Gazzarini
+ */
+ class QpidConnectionFactory extends BasePoolableObjectFactory
+ {
+ private final BrokerConnectionData _connectionData;
+ private final UUID _brokerId;
+
+ /**
+ * Builds a new connection factory with the given parameters.
+ *
+ * @param brokerId the broker identifier.
+ * @param connectionData the connecton data.
+ */
+ private QpidConnectionFactory(UUID brokerId, BrokerConnectionData connectionData)
+ {
+ this._connectionData = connectionData;
+ this._brokerId = brokerId;
+ }
+
+ /**
+ * Creates a new underlying connection.
+ */
+ @Override
+ public Connection makeObject () throws Exception
+ {
+ PooledConnection connection = new PooledConnection(_brokerId);
+ connection.connect(
+ _connectionData.getHost(),
+ _connectionData.getPort(),
+ _connectionData.getVirtualHost(),
+ _connectionData.getUsername(),
+ _connectionData.getPassword(),
+ false);
+ return connection;
+ }
+
+ /**
+ * Validates the underlying connection.
+ */
+ @Override
+ public boolean validateObject (Object obj)
+ {
+ PooledConnection connection = (PooledConnection) obj;
+ boolean isValid = connection.isValid();
+
+ LOGGER.debug(Messages.QMAN_200007_TEST_CONNECTION_ON_RESERVE,isValid);
+
+ return isValid;
+ }
+
+ /**
+ * Closes the underlying connection.
+ */
+ @Override
+ public void destroyObject (Object obj) throws Exception
+ {
+ try
+ {
+ PooledConnection connection = (PooledConnection) obj;
+ connection.reallyClose();
+
+ LOGGER.debug(Messages.QMAN_200008_CONNECTION_DESTROYED);
+ } catch (Exception exception)
+ {
+ LOGGER.debug(exception, Messages.QMAN_200009_CONNECTION_DESTROY_FAILURE);
+ }
+ }
+ }
+
+ // Singleton instance.
+ private static QpidDatasource instance = new QpidDatasource();
+
+ // Each entry contains a connection pool for a specific broker.
+ private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
+
+ // Private constructor.
+ private QpidDatasource()
+ {
+ }
+
+ /**
+ * Gets an available connection from the pool of the given broker.
+ *
+ * @param brokerId the broker identifier.
+ * @return a valid connection to the broker associated with the given identifier.
+ */
+ public Connection getConnection(UUID brokerId) throws Exception
+ {
+ return (Connection) pools.get(brokerId).borrowObject();
+ }
+
+ /**
+ * Entry point method for retrieving the singleton instance of this datasource.
+ *
+ * @return the qpid datasource singleton instance.
+ */
+ public static QpidDatasource getInstance()
+ {
+ return instance;
+ }
+
+ /**
+ * Adds a connection pool to this datasource.
+ *
+ * @param brokerId the broker identifier that will be associated with the new connection pool.
+ * @param connectionData the broker connection data.
+ * @throws Exception when the pool cannot be created.
+ */
+ void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) throws Exception
+ {
+ GenericObjectPoolFactory factory = new GenericObjectPoolFactory(
+ new QpidConnectionFactory(brokerId,connectionData),
+ connectionData.getMaxPoolCapacity(),
+ GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
+ connectionData.getMaxWaitTimeout(),-1,
+ true,
+ false);
+
+ ObjectPool pool = factory.createPool();
+
+ // Open connections at startup according to initial capacity param value.
+ int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity();
+ Object [] openStartupList = new Object[howManyConnectionAtStartup];
+
+ // Open...
+ for (int index = 0; index < howManyConnectionAtStartup; index++)
+ {
+ openStartupList[index] = pool.borrowObject();
+ }
+
+ // ...and immediately return them to pool. In this way the pooled connection has been opened.
+ for (int index = 0; index < howManyConnectionAtStartup; index++)
+ {
+ pool.returnObject(openStartupList[index]);
+ }
+
+ pools.put(brokerId,pool);
+ }
+}