diff options
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java')
-rw-r--r-- | qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java | 820 |
1 files changed, 820 insertions, 0 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java new file mode 100644 index 0000000000..d56f520db4 --- /dev/null +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java @@ -0,0 +1,820 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Session; +import javax.jms.XASession; +import javax.resource.ResourceException; +import javax.resource.spi.ActivationSpec; +import javax.resource.spi.BootstrapContext; +import javax.resource.spi.ResourceAdapter; +import javax.resource.spi.ResourceAdapterInternalException; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.resource.spi.work.WorkManager; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.XAConnectionImpl; +import org.apache.qpid.ra.inflow.QpidActivation; +import org.apache.qpid.ra.inflow.QpidActivationSpec; +import org.apache.qpid.url.URLSyntaxException; + +/** + * The resource adapter for Qpid + * + */ +public class QpidResourceAdapter implements ResourceAdapter, Serializable +{ + /** + * + */ + private static final long serialVersionUID = -2446231446818098726L; + + /** + * The logger + */ + private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class); + + /** + * The bootstrap context + */ + private BootstrapContext _ctx; + + /** + * The resource adapter properties + */ + private final QpidRAProperties _raProperties; + + /** + * Have the factory been configured + */ + private final AtomicBoolean _configured; + + /** + * The activations by activation spec + */ + private final Map<ActivationSpec, QpidActivation> _activations; + + private AMQConnectionFactory _defaultAMQConnectionFactory; + + private TransactionManager _tm; + + /** + * Constructor + */ + public QpidResourceAdapter() + { + if (_log.isTraceEnabled()) + { + _log.trace("constructor()"); + } + + _raProperties = new QpidRAProperties(); + _configured = new AtomicBoolean(false); + _activations = new ConcurrentHashMap<ActivationSpec, QpidActivation>(); + } + + public TransactionManager getTM() + { + return _tm; + } + + /** + * Endpoint activation + * + * @param endpointFactory The endpoint factory + * @param spec The activation spec + * @throws ResourceException Thrown if an error occurs + */ + public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec) throws ResourceException + { + if (!_configured.getAndSet(true)) + { + try + { + setup(); + } + catch (QpidRAException e) + { + throw new ResourceException("Unable to create activation", e); + } + } + if (_log.isTraceEnabled()) + { + _log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")"); + } + + QpidActivation activation = new QpidActivation(this, endpointFactory, (QpidActivationSpec)spec); + _activations.put(spec, activation); + activation.start(); + } + + /** + * Endpoint deactivation + * + * @param endpointFactory The endpoint factory + * @param spec The activation spec + */ + public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec) + { + if (_log.isTraceEnabled()) + { + _log.trace("endpointDeactivation(" + endpointFactory + ", " + spec + ")"); + } + + QpidActivation activation = _activations.remove(spec); + if (activation != null) + { + activation.stop(); + } + } + + /** + * Get XA resources + * + * @param specs The activation specs + * @return The XA resources + * @throws ResourceException Thrown if an error occurs or unsupported + */ + public XAResource[] getXAResources(final ActivationSpec[] specs) throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("getXAResources(" + specs + ")"); + } + + return null; + } + + /** + * Start + * + * @param ctx The bootstrap context + * @throws ResourceAdapterInternalException + * Thrown if an error occurs + */ + public void start(final BootstrapContext ctx) throws ResourceAdapterInternalException + { + if (_log.isTraceEnabled()) + { + _log.trace("start(" + ctx + ")"); + } + + locateTM(); + + this._ctx = ctx; + + _log.info("Qpid resource adapter started"); + } + + /** + * Stop + */ + public void stop() + { + if (_log.isTraceEnabled()) + { + _log.trace("stop()"); + } + + for (Map.Entry<ActivationSpec, QpidActivation> entry : _activations.entrySet()) + { + try + { + entry.getValue().stop(); + } + catch (Exception ignored) + { + _log.debug("Ignored", ignored); + } + } + + _activations.clear(); + + _log.info("Qpid resource adapter stopped"); + } + + /** + * Get the user name + * + * @return The value + */ + public String getDefaultUserName() + { + if (_log.isTraceEnabled()) + { + _log.trace("getUserName()"); + } + + return _raProperties.getUserName(); + } + + /** + * Set the user name + * + * @param userName The value + */ + public void setDefaultUserName(final String userName) + { + if (_log.isTraceEnabled()) + { + _log.trace("setUserName(" + userName + ")"); + } + + _raProperties.setUserName(userName); + } + + /** + * Get the password + * + * @return The value + */ + public String getDefaultPassword() + { + if (_log.isTraceEnabled()) + { + _log.trace("getPassword()"); + } + + return _raProperties.getPassword(); + } + + /** + * Set the password + * + * @param password The value + */ + public void setDefaultPassword(final String password) + { + if (_log.isTraceEnabled()) + { + _log.trace("setPassword(****)"); + } + + _raProperties.setPassword(password); + } + + /** + * Get the client ID + * + * @return The value + */ + public String getClientId() + { + if (_log.isTraceEnabled()) + { + _log.trace("getClientID()"); + } + + return _raProperties.getClientId(); + } + + /** + * Set the client ID + * + * @param clientID The client id + */ + public void setClientId(final String clientID) + { + if (_log.isTraceEnabled()) + { + _log.trace("setClientID(" + clientID + ")"); + } + + _raProperties.setClientId(clientID); + } + + /** + * Get the host + * + * @return The value + */ + public String getHost() + { + if (_log.isTraceEnabled()) + { + _log.trace("getHost()"); + } + + return _raProperties.getHost(); + } + + /** + * Set the host + * + * @param host The host + */ + public void setHost(final String host) + { + if (_log.isTraceEnabled()) + { + _log.trace("setHost(" + host + ")"); + } + + _raProperties.setHost(host); + } + + /** + * Get the port + * + * @return The value + */ + public Integer getPort() + { + if (_log.isTraceEnabled()) + { + _log.trace("getPort()"); + } + + return _raProperties.getPort(); + } + + /** + * Set the client ID + * + * @param port The port + */ + public void setPort(final Integer port) + { + if (_log.isTraceEnabled()) + { + _log.trace("setPort(" + port + ")"); + } + + _raProperties.setPort(port); + } + + /** + * Get the connection url + * + * @return The value + */ + public String getPath() + { + if (_log.isTraceEnabled()) + { + _log.trace("getPath()"); + } + + return _raProperties.getPath(); + } + + /** + * Set the client ID + * + * @param path The path + */ + public void setPath(final String path) + { + if (_log.isTraceEnabled()) + { + _log.trace("setPath(" + path + ")"); + } + + _raProperties.setPath(path); + } + + /** + * Get the connection url + * + * @return The value + */ + public String getConnectionURL() + { + if (_log.isTraceEnabled()) + { + _log.trace("getConnectionURL()"); + } + + return _raProperties.getConnectionURL(); + } + + /** + * Set the client ID + * + * @param connectionURL The connection url + */ + public void setConnectionURL(final String connectionURL) + { + if (_log.isTraceEnabled()) + { + _log.trace("setConnectionURL(" + connectionURL + ")"); + } + + _raProperties.setConnectionURL(connectionURL); + } + + /** + * Get the transaction manager locator class + * + * @return The value + */ + public String getTransactionManagerLocatorClass() + { + if (_log.isTraceEnabled()) + { + _log.trace("getTransactionManagerLocatorClass()"); + } + + return _raProperties.getTransactionManagerLocatorClass(); + } + + /** + * Set the transaction manager locator class + * + * @param locator The transaction manager locator class + */ + public void setTransactionManagerLocatorClass(final String locator) + { + if (_log.isTraceEnabled()) + { + _log.trace("setTransactionManagerLocatorClass(" + locator + ")"); + } + + _raProperties.setTransactionManagerLocatorClass(locator); + } + + /** + * Get the transaction manager locator method + * + * @return The value + */ + public String getTransactionManagerLocatorMethod() + { + if (_log.isTraceEnabled()) + { + _log.trace("getTransactionManagerLocatorMethod()"); + } + + return _raProperties.getTransactionManagerLocatorMethod(); + } + + /** + * Set the transaction manager locator method + * + * @param method The transaction manager locator method + */ + public void setTransactionManagerLocatorMethod(final String method) + { + if (_log.isTraceEnabled()) + { + _log.trace("setTransactionManagerLocatorMethod(" + method + ")"); + } + + _raProperties.setTransactionManagerLocatorMethod(method); + } + + /** + * Get the use XA flag + * + * @return The value + */ + public Boolean getUseLocalTx() + { + if (_log.isTraceEnabled()) + { + _log.trace("getUseLocalTx()"); + } + + return _raProperties.getUseLocalTx(); + } + + /** + * Set the use XA flag + * + * @param localTx The value + */ + public void setUseLocalTx(final Boolean localTx) + { + if (_log.isTraceEnabled()) + { + _log.trace("setUseLocalTx(" + localTx + ")"); + } + + _raProperties.setUseLocalTx(localTx); + } + + public Integer getSetupAttempts() + { + if (_log.isTraceEnabled()) + { + _log.trace("getSetupAttempts()"); + } + return _raProperties.getSetupAttempts(); + } + + public void setSetupAttempts(Integer setupAttempts) + { + if (_log.isTraceEnabled()) + { + _log.trace("setSetupAttempts(" + setupAttempts + ")"); + } + _raProperties.setSetupAttempts(setupAttempts); + } + + public Long getSetupInterval() + { + if (_log.isTraceEnabled()) + { + _log.trace("getSetupInterval()"); + } + return _raProperties.getSetupInterval(); + } + + public void setSetupInterval(Long interval) + { + if (_log.isTraceEnabled()) + { + _log.trace("setSetupInterval(" + interval + ")"); + } + _raProperties.setSetupInterval(interval); + } + + /** + * Indicates whether some other object is "equal to" this one. + * + * @param obj Object with which to compare + * @return True if this object is the same as the obj argument; false otherwise. + */ + public boolean equals(final Object obj) + { + if (obj == null) + { + return false; + } + + if (obj instanceof QpidResourceAdapter) + { + return _raProperties.equals(((QpidResourceAdapter)obj).getProperties()); + } + else + { + return false; + } + } + + /** + * Return the hash code for the object + * + * @return The hash code + */ + public int hashCode() + { + return _raProperties.hashCode(); + } + + /** + * Get the work manager + * + * @return The manager + */ + public WorkManager getWorkManager() + { + if (_log.isTraceEnabled()) + { + _log.trace("getWorkManager()"); + } + + if (_ctx == null) + { + return null; + } + + return _ctx.getWorkManager(); + } + + public XASession createXASession(final XAConnectionImpl connection) + throws Exception + { + final XASession result = connection.createXASession() ; + if (_log.isDebugEnabled()) + { + _log.debug("Using session " + Util.asString(result)); + } + return result ; + } + + public Session createSession(final AMQConnection connection, + final int ackMode, + final boolean useLocalTx, + final Integer prefetchLow, + final Integer prefetchHigh) throws Exception + { + Session result; + + if (prefetchLow == null) + { + result = connection.createSession(useLocalTx, ackMode) ; + } + else if (prefetchHigh == null) + { + result = connection.createSession(useLocalTx, ackMode, prefetchLow) ; + } + else + { + result = connection.createSession(useLocalTx, ackMode, prefetchHigh, prefetchLow) ; + } + + if (_log.isDebugEnabled()) + { + _log.debug("Using session " + Util.asString(result)); + } + + return result; + + } + + /** + * Get the resource adapter properties + * + * @return The properties + */ + protected QpidRAProperties getProperties() + { + if (_log.isTraceEnabled()) + { + _log.trace("getProperties()"); + } + + return _raProperties; + } + + /** + * Setup the factory + */ + protected void setup() throws QpidRAException + { + _defaultAMQConnectionFactory = createAMQConnectionFactory(_raProperties); + } + + + public AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException + { + if (!_configured.getAndSet(true)) + { + try + { + setup(); + } + catch (QpidRAException e) + { + throw new ResourceException("Unable to create activation", e); + } + } + return _defaultAMQConnectionFactory; + } + + public AMQConnectionFactory createAMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) + throws QpidRAException + { + try + { + return createFactory(overrideProperties); + } + catch (final URLSyntaxException urlse) + { + throw new QpidRAException("Unexpected exception creating connection factory", urlse) ; + } + } + + public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams, + final Map<String, Object> overrideConnectionParams) + { + Map<String, Object> map = new HashMap<String, Object>(); + + if(connectionParams != null) + { + map.putAll(connectionParams); + } + if(overrideConnectionParams != null) + { + for (Map.Entry<String, Object> stringObjectEntry : overrideConnectionParams.entrySet()) + { + map.put(stringObjectEntry.getKey(), stringObjectEntry.getValue()); + } + } + return map; + } + + private void locateTM() + { + if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null) + { + + String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";"); + String locatorMethods[] = _raProperties.getTransactionManagerLocatorMethod().split(";"); + + for (int i = 0 ; i < locatorClasses.length; i++) + { + _tm = Util.locateTM(locatorClasses[i], locatorMethods[i]); + if (_tm != null) + { + break; + } + } + + + } + + if (_tm == null) + { + _log.warn("It wasn't possible to lookup a Transaction Manager through the configured properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod"); + _log.warn("Qpid Resource Adapter won't be able to set and verify transaction timeouts in certain cases."); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug("TM located = " + _tm); + } + } + } + + + private AMQConnectionFactory createFactory(final ConnectionFactoryProperties overrideProperties) + throws URLSyntaxException, QpidRAException + { + final String overrideURL = overrideProperties.getConnectionURL() ; + final String url = overrideURL != null ? overrideURL : _raProperties.getConnectionURL() ; + + final String overrideClientID = overrideProperties.getClientId() ; + final String clientID = (overrideClientID != null ? overrideClientID : _raProperties.getClientId()) ; + + final String overrideDefaultPassword = overrideProperties.getPassword() ; + final String defaultPassword = (overrideDefaultPassword != null ? overrideDefaultPassword : _raProperties.getPassword()) ; + + final String overrideDefaultUsername = overrideProperties.getUserName() ; + final String defaultUsername = (overrideDefaultUsername != null ? overrideDefaultUsername : _raProperties.getUserName()) ; + + final String overrideHost = overrideProperties.getHost() ; + final String host = (overrideHost != null ? overrideHost : _raProperties.getHost()) ; + + final Integer overridePort = overrideProperties.getPort() ; + final Integer port = (overridePort != null ? overridePort : _raProperties.getPort()) ; + + final String overridePath = overrideProperties.getPath() ; + final String path = (overridePath != null ? overridePath : _raProperties.getPath()) ; + + final AMQConnectionFactory cf ; + + if (url != null) + { + cf = new AMQConnectionFactory(url) ; + + if (clientID != null) + { + cf.getConnectionURL().setClientName(clientID) ; + } + } + else + { + // create a URL to force the connection details + if ((host == null) || (port == null) || (path == null)) + { + throw new QpidRAException("Configuration requires host/port/path if connectionURL is not specified") ; + } + final String username = (defaultUsername != null ? defaultUsername : "") ; + final String password = (defaultPassword != null ? defaultPassword : "") ; + final String client = (clientID != null ? clientID : "") ; + + final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ; + if (_log.isDebugEnabled()) + { + _log.debug("Initialising connectionURL to " + newurl) ; + } + + cf = new AMQConnectionFactory(newurl) ; + } + + return cf ; + } +} |