summaryrefslogtreecommitdiff
path: root/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java')
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java820
1 files changed, 820 insertions, 0 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
new file mode 100644
index 0000000000..d56f520db4
--- /dev/null
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
@@ -0,0 +1,820 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.inflow.QpidActivation;
+import org.apache.qpid.ra.inflow.QpidActivationSpec;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The resource adapter for Qpid
+ *
+ */
+public class QpidResourceAdapter implements ResourceAdapter, Serializable
+{
+ /**
+ *
+ */
+ private static final long serialVersionUID = -2446231446818098726L;
+
+ /**
+ * The logger
+ */
+ private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
+
+ /**
+ * The bootstrap context
+ */
+ private BootstrapContext _ctx;
+
+ /**
+ * The resource adapter properties
+ */
+ private final QpidRAProperties _raProperties;
+
+ /**
+ * Have the factory been configured
+ */
+ private final AtomicBoolean _configured;
+
+ /**
+ * The activations by activation spec
+ */
+ private final Map<ActivationSpec, QpidActivation> _activations;
+
+ private AMQConnectionFactory _defaultAMQConnectionFactory;
+
+ private TransactionManager _tm;
+
+ /**
+ * Constructor
+ */
+ public QpidResourceAdapter()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor()");
+ }
+
+ _raProperties = new QpidRAProperties();
+ _configured = new AtomicBoolean(false);
+ _activations = new ConcurrentHashMap<ActivationSpec, QpidActivation>();
+ }
+
+ public TransactionManager getTM()
+ {
+ return _tm;
+ }
+
+ /**
+ * Endpoint activation
+ *
+ * @param endpointFactory The endpoint factory
+ * @param spec The activation spec
+ * @throws ResourceException Thrown if an error occurs
+ */
+ public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec) throws ResourceException
+ {
+ if (!_configured.getAndSet(true))
+ {
+ try
+ {
+ setup();
+ }
+ catch (QpidRAException e)
+ {
+ throw new ResourceException("Unable to create activation", e);
+ }
+ }
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
+ }
+
+ QpidActivation activation = new QpidActivation(this, endpointFactory, (QpidActivationSpec)spec);
+ _activations.put(spec, activation);
+ activation.start();
+ }
+
+ /**
+ * Endpoint deactivation
+ *
+ * @param endpointFactory The endpoint factory
+ * @param spec The activation spec
+ */
+ public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("endpointDeactivation(" + endpointFactory + ", " + spec + ")");
+ }
+
+ QpidActivation activation = _activations.remove(spec);
+ if (activation != null)
+ {
+ activation.stop();
+ }
+ }
+
+ /**
+ * Get XA resources
+ *
+ * @param specs The activation specs
+ * @return The XA resources
+ * @throws ResourceException Thrown if an error occurs or unsupported
+ */
+ public XAResource[] getXAResources(final ActivationSpec[] specs) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getXAResources(" + specs + ")");
+ }
+
+ return null;
+ }
+
+ /**
+ * Start
+ *
+ * @param ctx The bootstrap context
+ * @throws ResourceAdapterInternalException
+ * Thrown if an error occurs
+ */
+ public void start(final BootstrapContext ctx) throws ResourceAdapterInternalException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start(" + ctx + ")");
+ }
+
+ locateTM();
+
+ this._ctx = ctx;
+
+ _log.info("Qpid resource adapter started");
+ }
+
+ /**
+ * Stop
+ */
+ public void stop()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop()");
+ }
+
+ for (Map.Entry<ActivationSpec, QpidActivation> entry : _activations.entrySet())
+ {
+ try
+ {
+ entry.getValue().stop();
+ }
+ catch (Exception ignored)
+ {
+ _log.debug("Ignored", ignored);
+ }
+ }
+
+ _activations.clear();
+
+ _log.info("Qpid resource adapter stopped");
+ }
+
+ /**
+ * Get the user name
+ *
+ * @return The value
+ */
+ public String getDefaultUserName()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUserName()");
+ }
+
+ return _raProperties.getUserName();
+ }
+
+ /**
+ * Set the user name
+ *
+ * @param userName The value
+ */
+ public void setDefaultUserName(final String userName)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUserName(" + userName + ")");
+ }
+
+ _raProperties.setUserName(userName);
+ }
+
+ /**
+ * Get the password
+ *
+ * @return The value
+ */
+ public String getDefaultPassword()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPassword()");
+ }
+
+ return _raProperties.getPassword();
+ }
+
+ /**
+ * Set the password
+ *
+ * @param password The value
+ */
+ public void setDefaultPassword(final String password)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPassword(****)");
+ }
+
+ _raProperties.setPassword(password);
+ }
+
+ /**
+ * Get the client ID
+ *
+ * @return The value
+ */
+ public String getClientId()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getClientID()");
+ }
+
+ return _raProperties.getClientId();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param clientID The client id
+ */
+ public void setClientId(final String clientID)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setClientID(" + clientID + ")");
+ }
+
+ _raProperties.setClientId(clientID);
+ }
+
+ /**
+ * Get the host
+ *
+ * @return The value
+ */
+ public String getHost()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getHost()");
+ }
+
+ return _raProperties.getHost();
+ }
+
+ /**
+ * Set the host
+ *
+ * @param host The host
+ */
+ public void setHost(final String host)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setHost(" + host + ")");
+ }
+
+ _raProperties.setHost(host);
+ }
+
+ /**
+ * Get the port
+ *
+ * @return The value
+ */
+ public Integer getPort()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPort()");
+ }
+
+ return _raProperties.getPort();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param port The port
+ */
+ public void setPort(final Integer port)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPort(" + port + ")");
+ }
+
+ _raProperties.setPort(port);
+ }
+
+ /**
+ * Get the connection url
+ *
+ * @return The value
+ */
+ public String getPath()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPath()");
+ }
+
+ return _raProperties.getPath();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param path The path
+ */
+ public void setPath(final String path)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPath(" + path + ")");
+ }
+
+ _raProperties.setPath(path);
+ }
+
+ /**
+ * Get the connection url
+ *
+ * @return The value
+ */
+ public String getConnectionURL()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getConnectionURL()");
+ }
+
+ return _raProperties.getConnectionURL();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param connectionURL The connection url
+ */
+ public void setConnectionURL(final String connectionURL)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setConnectionURL(" + connectionURL + ")");
+ }
+
+ _raProperties.setConnectionURL(connectionURL);
+ }
+
+ /**
+ * Get the transaction manager locator class
+ *
+ * @return The value
+ */
+ public String getTransactionManagerLocatorClass()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionManagerLocatorClass()");
+ }
+
+ return _raProperties.getTransactionManagerLocatorClass();
+ }
+
+ /**
+ * Set the transaction manager locator class
+ *
+ * @param locator The transaction manager locator class
+ */
+ public void setTransactionManagerLocatorClass(final String locator)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionManagerLocatorClass(" + locator + ")");
+ }
+
+ _raProperties.setTransactionManagerLocatorClass(locator);
+ }
+
+ /**
+ * Get the transaction manager locator method
+ *
+ * @return The value
+ */
+ public String getTransactionManagerLocatorMethod()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionManagerLocatorMethod()");
+ }
+
+ return _raProperties.getTransactionManagerLocatorMethod();
+ }
+
+ /**
+ * Set the transaction manager locator method
+ *
+ * @param method The transaction manager locator method
+ */
+ public void setTransactionManagerLocatorMethod(final String method)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionManagerLocatorMethod(" + method + ")");
+ }
+
+ _raProperties.setTransactionManagerLocatorMethod(method);
+ }
+
+ /**
+ * Get the use XA flag
+ *
+ * @return The value
+ */
+ public Boolean getUseLocalTx()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUseLocalTx()");
+ }
+
+ return _raProperties.getUseLocalTx();
+ }
+
+ /**
+ * Set the use XA flag
+ *
+ * @param localTx The value
+ */
+ public void setUseLocalTx(final Boolean localTx)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUseLocalTx(" + localTx + ")");
+ }
+
+ _raProperties.setUseLocalTx(localTx);
+ }
+
+ public Integer getSetupAttempts()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupAttempts()");
+ }
+ return _raProperties.getSetupAttempts();
+ }
+
+ public void setSetupAttempts(Integer setupAttempts)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupAttempts(" + setupAttempts + ")");
+ }
+ _raProperties.setSetupAttempts(setupAttempts);
+ }
+
+ public Long getSetupInterval()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupInterval()");
+ }
+ return _raProperties.getSetupInterval();
+ }
+
+ public void setSetupInterval(Long interval)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupInterval(" + interval + ")");
+ }
+ _raProperties.setSetupInterval(interval);
+ }
+
+ /**
+ * Indicates whether some other object is "equal to" this one.
+ *
+ * @param obj Object with which to compare
+ * @return True if this object is the same as the obj argument; false otherwise.
+ */
+ public boolean equals(final Object obj)
+ {
+ if (obj == null)
+ {
+ return false;
+ }
+
+ if (obj instanceof QpidResourceAdapter)
+ {
+ return _raProperties.equals(((QpidResourceAdapter)obj).getProperties());
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /**
+ * Return the hash code for the object
+ *
+ * @return The hash code
+ */
+ public int hashCode()
+ {
+ return _raProperties.hashCode();
+ }
+
+ /**
+ * Get the work manager
+ *
+ * @return The manager
+ */
+ public WorkManager getWorkManager()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getWorkManager()");
+ }
+
+ if (_ctx == null)
+ {
+ return null;
+ }
+
+ return _ctx.getWorkManager();
+ }
+
+ public XASession createXASession(final XAConnectionImpl connection)
+ throws Exception
+ {
+ final XASession result = connection.createXASession() ;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Using session " + Util.asString(result));
+ }
+ return result ;
+ }
+
+ public Session createSession(final AMQConnection connection,
+ final int ackMode,
+ final boolean useLocalTx,
+ final Integer prefetchLow,
+ final Integer prefetchHigh) throws Exception
+ {
+ Session result;
+
+ if (prefetchLow == null)
+ {
+ result = connection.createSession(useLocalTx, ackMode) ;
+ }
+ else if (prefetchHigh == null)
+ {
+ result = connection.createSession(useLocalTx, ackMode, prefetchLow) ;
+ }
+ else
+ {
+ result = connection.createSession(useLocalTx, ackMode, prefetchHigh, prefetchLow) ;
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Using session " + Util.asString(result));
+ }
+
+ return result;
+
+ }
+
+ /**
+ * Get the resource adapter properties
+ *
+ * @return The properties
+ */
+ protected QpidRAProperties getProperties()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getProperties()");
+ }
+
+ return _raProperties;
+ }
+
+ /**
+ * Setup the factory
+ */
+ protected void setup() throws QpidRAException
+ {
+ _defaultAMQConnectionFactory = createAMQConnectionFactory(_raProperties);
+ }
+
+
+ public AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException
+ {
+ if (!_configured.getAndSet(true))
+ {
+ try
+ {
+ setup();
+ }
+ catch (QpidRAException e)
+ {
+ throw new ResourceException("Unable to create activation", e);
+ }
+ }
+ return _defaultAMQConnectionFactory;
+ }
+
+ public AMQConnectionFactory createAMQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
+ throws QpidRAException
+ {
+ try
+ {
+ return createFactory(overrideProperties);
+ }
+ catch (final URLSyntaxException urlse)
+ {
+ throw new QpidRAException("Unexpected exception creating connection factory", urlse) ;
+ }
+ }
+
+ public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
+ final Map<String, Object> overrideConnectionParams)
+ {
+ Map<String, Object> map = new HashMap<String, Object>();
+
+ if(connectionParams != null)
+ {
+ map.putAll(connectionParams);
+ }
+ if(overrideConnectionParams != null)
+ {
+ for (Map.Entry<String, Object> stringObjectEntry : overrideConnectionParams.entrySet())
+ {
+ map.put(stringObjectEntry.getKey(), stringObjectEntry.getValue());
+ }
+ }
+ return map;
+ }
+
+ private void locateTM()
+ {
+ if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null)
+ {
+
+ String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
+ String locatorMethods[] = _raProperties.getTransactionManagerLocatorMethod().split(";");
+
+ for (int i = 0 ; i < locatorClasses.length; i++)
+ {
+ _tm = Util.locateTM(locatorClasses[i], locatorMethods[i]);
+ if (_tm != null)
+ {
+ break;
+ }
+ }
+
+
+ }
+
+ if (_tm == null)
+ {
+ _log.warn("It wasn't possible to lookup a Transaction Manager through the configured properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
+ _log.warn("Qpid Resource Adapter won't be able to set and verify transaction timeouts in certain cases.");
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("TM located = " + _tm);
+ }
+ }
+ }
+
+
+ private AMQConnectionFactory createFactory(final ConnectionFactoryProperties overrideProperties)
+ throws URLSyntaxException, QpidRAException
+ {
+ final String overrideURL = overrideProperties.getConnectionURL() ;
+ final String url = overrideURL != null ? overrideURL : _raProperties.getConnectionURL() ;
+
+ final String overrideClientID = overrideProperties.getClientId() ;
+ final String clientID = (overrideClientID != null ? overrideClientID : _raProperties.getClientId()) ;
+
+ final String overrideDefaultPassword = overrideProperties.getPassword() ;
+ final String defaultPassword = (overrideDefaultPassword != null ? overrideDefaultPassword : _raProperties.getPassword()) ;
+
+ final String overrideDefaultUsername = overrideProperties.getUserName() ;
+ final String defaultUsername = (overrideDefaultUsername != null ? overrideDefaultUsername : _raProperties.getUserName()) ;
+
+ final String overrideHost = overrideProperties.getHost() ;
+ final String host = (overrideHost != null ? overrideHost : _raProperties.getHost()) ;
+
+ final Integer overridePort = overrideProperties.getPort() ;
+ final Integer port = (overridePort != null ? overridePort : _raProperties.getPort()) ;
+
+ final String overridePath = overrideProperties.getPath() ;
+ final String path = (overridePath != null ? overridePath : _raProperties.getPath()) ;
+
+ final AMQConnectionFactory cf ;
+
+ if (url != null)
+ {
+ cf = new AMQConnectionFactory(url) ;
+
+ if (clientID != null)
+ {
+ cf.getConnectionURL().setClientName(clientID) ;
+ }
+ }
+ else
+ {
+ // create a URL to force the connection details
+ if ((host == null) || (port == null) || (path == null))
+ {
+ throw new QpidRAException("Configuration requires host/port/path if connectionURL is not specified") ;
+ }
+ final String username = (defaultUsername != null ? defaultUsername : "") ;
+ final String password = (defaultPassword != null ? defaultPassword : "") ;
+ final String client = (clientID != null ? clientID : "") ;
+
+ final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Initialising connectionURL to " + newurl) ;
+ }
+
+ cf = new AMQConnectionFactory(newurl) ;
+ }
+
+ return cf ;
+ }
+}