summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWeston M. Price <wprice@apache.org>2012-05-03 14:23:03 +0000
committerWeston M. Price <wprice@apache.org>2012-05-03 14:23:03 +0000
commita6c3d4d239970810b979abd6bb4f00a206cf299d (patch)
tree3db393656f2cba8b5bd3416850ed7915666e5d61
parent353418fa6793c7c6da94fef2c08500a965b800fc (diff)
downloadqpid-python-a6c3d4d239970810b979abd6bb4f00a206cf299d.tar.gz
QPID-3878:
QpidActivation should use connection per inbound listener Added ability to configure inbound listener multiplex behavior Added new property to ra.xml Updated README.txt to reflect changes Added Serialization unit tests for ActivationSpec and ResourceAdapter Contributions from Kevin Conner git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1333473 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/jca/README.txt50
-rw-r--r--qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java4
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java20
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java53
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java25
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java501
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java29
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java339
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java139
-rwxr-xr-xqpid/java/jca/src/main/resources/META-INF/ra.xml7
-rw-r--r--qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java45
-rw-r--r--qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java13
12 files changed, 641 insertions, 584 deletions
diff --git a/qpid/java/jca/README.txt b/qpid/java/jca/README.txt
index 6fdd028349..5a7c467685 100644
--- a/qpid/java/jca/README.txt
+++ b/qpid/java/jca/README.txt
@@ -7,7 +7,8 @@ for JEE integration between EE applications and AMQP 0.10 message brokers.
The Qpid JCA adapter provides both outbound and inbound connectivity and
exposes a variety of options to fine tune your messaging applications. Currently
-the adapter only supports C++ based brokers and has only been tested with Apache Qpid C++ broker.
+the adapter only supports C++ based brokers and has only been tested with Apache
+Qpid C++ broker.
The following document explains general configuration information for the Qpid JCA RA. Details for
specific application server platforms are provided in separate README files typically designated as
@@ -38,22 +39,22 @@ encouraged. Similarly, familiarity with the JCA 1.5 specification is encouraged
The ResourceAdapter JavaBean
============================
-The ResourceAdapter JavaBean provides global configuration options for both inbound and outbound connectivity.
-The set of ResourceAdapter properties are described below. The ResourceAdapter properties can be found in the META-INF/ra.xml
-deployment descriptor which is provided with the adapter. Note, deploying a ResourceAdapter, ManagedConnectionFactory
-or ActivationSpec is application server specific. As such, this document provides an explanation of these properties
-but not how they are configured as this is environment specific.
+The ResourceAdapter JavaBean provides global configuration options for both inbound and outbound
+connectivity. The set of ResourceAdapter properties are described below. The ResourceAdapter properties
+can be found in the META-INF/ra.xml deployment descriptor which is provided with the adapter. Note,
+deploying a ResourceAdapter, ManagedConnectionFactory or ActivationSpec is application server specific.
+As such, this document provides an explanation of these properties but not how they are configured.
ResourceAdapter JavaBean Properties
===================================
-ClientID
+ClientId
The unique client identifier. From the JMS API this is used only in the context of durable subscriptions.
Default: client_id
SetupAttempts
- The number of attempts the ResourceAdapter will make to successfully setup an inbound activation on deployment, or when an exception
- occurs at runtime.
+ The number of attempts the ResourceAdapter will make to successfully setup an inbound activation on deployment,
+ or when an exception occurs at runtime.
Default: 5
SetupInterval
@@ -92,9 +93,11 @@ TransactionManagerLocatorMethod
server specific as such, no default is provided.
Default:none
-Note, both the TransactionManagerLocatorClass and the TransactionManagerLocatorMethod
-properties must be set. While application servers typically provide a mechanism to do this in the form of
-a specific deployment descriptor, or GUI console, the ra.xml file can also be modified directly.
+UseConnectionPerHandler
+ The Apache C++ Broker multiplexes on the physical connection rather than the session. As a result, performance
+ improvements can be gained by allocating and assigning a connection per inbound listener. The alternative is
+ to share a connection across each handler for a given endpoint (MDB).
+Default:true
The ManagedConnectionFactory JavaBean
=====================================
@@ -176,11 +179,11 @@ Both these administered objects have properities to support configuration and de
QpidQueue/QpidTopic
====================
- The QpidQueue/QpidTopic AdminObjects allow a developer, deployer or adminstrator to create destinations
- (queues or topic) and bind these destinations into JNDI. Only one property is required:
+The QpidQueue and QpidTopic AdminObjects allow binding JMS destintations into the JEE JNDI namespace. Both
+objects support one property:
DestinationAddress
- The address string of the destination. Please see the Qpid Java JMS client documentation for valid values.
+ The address string of the destination. Please see the Qpid JMS client documentation for valid values.
Example:
DestinationAddress=hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}
@@ -188,13 +191,13 @@ Example:
QpidConnectionFactoryProxy
==========================
- The QpidConnectionFactoryProxy allows for a non-JCA ConnectionFactory to be bound into the JNDI tree. This
- ConnectionFactory can in turn be used outside of the application server. Typically a ConnectionFactory of
- this sort is used by Java Swing or other non-managed clients not requiring JCA. One one property is
- required:
+The QpidConnectionFactoryProxy allows for a non-JCA ConnectionFactory to be bound into the JNDI tree. This
+ConnectionFactory can in turn be used outside of the application server. Typically a ConnectionFactory of
+this sort is used by Swing or other non-managed clients not requiring JCA. The QpidConnectionFactoryProxy
+provides one property
ConnectionURL
- This is the url used to configure the connection factory. Please see the Qpid Java Client documentation for
+ This is the url used to configure the connection factory. Please see the Qpid JMS client documentation for
further details.
Example:
@@ -205,10 +208,11 @@ Transaction Support
The Qpid JCA Resource Adapter provides three levels of transaction support: XA, LocalTransactions and NoTransaction.
Typical usage of the Qpid JCA Resource adapter implies the use of XA transactions, though there are certain scenarios
where this is not preferred. Transaction support configuration is application server specific and as such, is explained
-in the corresponding documentation for each supported application server. Current limitations with XA are listed
-below:
+in the corresponding documentation for each supported application server.
-1)XARecovery is currently only supported for JBoss EAP 5.x and is not supported for clustered broker configurations.
+XA recovery, that is being able to recover 'in-doubt' transactions for a given resource manager is non-standardized,
+and as such is application server specific. Currently, the Qpid JCA adapter only supports recovery for JBoss EAP 5.x
+as a separate module.
Conclusion
==========
diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java
index 42a9ab6f60..75e0acab79 100644
--- a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java
+++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java
@@ -66,10 +66,6 @@ public class QpidHelloListenerBean implements MessageListener
try
{
- _log.info(message.getJMSDestination().getClass().getName());
-
- javax.jms.Queue queue = (javax.jms.Queue)message.getJMSDestination();
- _log.info("QueueName is: " + queue.getQueueName());
if(message instanceof TextMessage)
{
String content = ((TextMessage)message).getText();
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
index 1ded1a8db2..69320575b0 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
@@ -31,10 +31,8 @@ import org.slf4j.LoggerFactory;
*/
public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable
{
- /** Serial version UID */
private static final long serialVersionUID = -4823893873707374791L;
- /** The logger */
private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class);
private static final int DEFAULT_SETUP_ATTEMPTS = 10;
@@ -45,16 +43,14 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser
private long _setupInterval = DEFAULT_SETUP_INTERVAL;
- /** Use Local TX instead of XA */
- private Boolean _localTx = false;
-
/** Class used to locate the Transaction Manager. */
private String _transactionManagerLocatorClass ;
/** Method used to locate the TM */
private String _transactionManagerLocatorMethod ;
-
+ private boolean _useConnectionPerHandler = true;
+
/**
* Constructor
*/
@@ -146,10 +142,20 @@ public class QpidRAProperties extends ConnectionFactoryProperties implements Ser
this._setupInterval = setupInterval;
}
+ public boolean isUseConnectionPerHandler()
+ {
+ return _useConnectionPerHandler;
+ }
+
+ public void setUseConnectionPerHandler(boolean connectionPerHandler)
+ {
+ this._useConnectionPerHandler = connectionPerHandler;
+ }
+
@Override
public String toString()
{
- return "QpidRAProperties[localTx=" + _localTx +
+ return "QpidRAProperties[" +
", transactionManagerLocatorClass=" + _transactionManagerLocatorClass +
", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod +
", setupAttempts=" + _setupAttempts +
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
index 14b5354062..96fa83ceef 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
@@ -38,8 +38,6 @@ import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
@@ -47,6 +45,8 @@ import org.apache.qpid.client.XAConnectionImpl;
import org.apache.qpid.ra.inflow.QpidActivation;
import org.apache.qpid.ra.inflow.QpidActivationSpec;
import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The resource adapter for Qpid
@@ -54,43 +54,22 @@ import org.apache.qpid.url.URLSyntaxException;
*/
public class QpidResourceAdapter implements ResourceAdapter, Serializable
{
- /**
- *
- */
private static final long serialVersionUID = -2446231446818098726L;
- /**
- * The logger
- */
- private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
+ private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
- /**
- * The bootstrap context
- */
private BootstrapContext _ctx;
- /**
- * The resource adapter properties
- */
private final QpidRAProperties _raProperties;
- /**
- * Have the factory been configured
- */
private final AtomicBoolean _configured;
- /**
- * The activations by activation spec
- */
private final Map<ActivationSpec, QpidActivation> _activations;
private AMQConnectionFactory _defaultAMQConnectionFactory;
private TransactionManager _tm;
- /**
- * Constructor
- */
public QpidResourceAdapter()
{
if (_log.isTraceEnabled())
@@ -514,7 +493,27 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
}
_raProperties.setSetupInterval(interval);
}
+
+ public Boolean isUseConnectionPerHandler()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isConnectionPerHandler()");
+ }
+
+ return _raProperties.isUseConnectionPerHandler();
+ }
+ public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setConnectionPerHandler(" + connectionPerHandler + ")");
+ }
+
+ _raProperties.setUseConnectionPerHandler(connectionPerHandler);
+ }
+
/**
* Indicates whether some other object is "equal to" this one.
*
@@ -683,7 +682,8 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
private void locateTM() throws ResourceAdapterInternalException
{
- if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null)
+ if(_raProperties.getTransactionManagerLocatorClass() != null
+ && _raProperties.getTransactionManagerLocatorMethod() != null)
{
String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
@@ -703,7 +703,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
if (_tm == null)
{
- _log.error("It wasn't possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
+ _log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager");
}
else
@@ -763,6 +763,7 @@ public class QpidResourceAdapter implements ResourceAdapter, Serializable
final String client = (clientID != null ? clientID : "") ;
final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
+
if (_log.isDebugEnabled())
{
_log.debug("Initialising connectionURL to " + newurl) ;
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
index 417849fc5c..162099d1ee 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueueImpl.java
@@ -21,25 +21,15 @@
package org.apache.qpid.ra.admin;
import java.io.Externalizable;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
-import java.io.ObjectInputStream;
import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Hashtable;
-import javax.naming.Context;
-import javax.naming.Name;
import javax.naming.NamingException;
-import javax.naming.RefAddr;
import javax.naming.Reference;
import javax.naming.StringRefAddr;
-import javax.naming.spi.ObjectFactory;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.ra.admin.AdminObjectFactory;
public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable
{
@@ -101,19 +91,4 @@ public class QpidQueueImpl extends AMQQueue implements QpidQueue, Externalizable
out.writeObject(this._url);
}
- //TODO move to tests
- public static void main(String[] args) throws Exception
- {
- QpidQueueImpl q = new QpidQueueImpl();
- q.setDestinationAddress("hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}");
- ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("queue.out"));
- os.writeObject(q);
- os.close();
-
-
- ObjectInputStream is = new ObjectInputStream(new FileInputStream("queue.out"));
- q = (QpidQueueImpl)is.readObject();
- System.out.println(q);
-
- }
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
index 57edec8eee..2327512a62 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
@@ -20,114 +20,28 @@
*/
package org.apache.qpid.ra.inflow;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
+import org.apache.qpid.ra.QpidResourceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.XAConnectionImpl;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.ra.QpidResourceAdapter;
-import org.apache.qpid.ra.Util;
-
/**
* The activation.
*
*/
-public class QpidActivation implements ExceptionListener
+public class QpidActivation extends QpidExceptionHandler
{
- /**
- * The logger
- */
private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class);
- /**
- * The onMessage method
- */
- public static final Method ONMESSAGE;
-
- /**
- * The resource adapter
- */
- private final QpidResourceAdapter _ra;
-
- /**
- * The activation spec
- */
- private final QpidActivationSpec _spec;
-
- /**
- * The message endpoint factory
- */
- private final MessageEndpointFactory _endpointFactory;
-
- /**
- * Whether delivery is active
- */
- private final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
-
- /**
- * The destination type
- */
- private boolean _isTopic = false;
-
- /**
- * Is the delivery transacted
- */
- private boolean _isDeliveryTransacted;
-
- private Destination _destination;
-
- /**
- * The connection
- */
- private Connection _connection;
-
private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>();
- private AMQConnectionFactory _factory;
-
- // Whether we are in the failure recovery loop
- private AtomicBoolean _inFailure = new AtomicBoolean(false);
-
- //Whether or not we have completed activating
- private AtomicBoolean _activated = new AtomicBoolean(false);
-
- static
- {
- try
- {
- ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
+
/**
* Constructor
*
@@ -140,97 +54,8 @@ public class QpidActivation implements ExceptionListener
final MessageEndpointFactory endpointFactory,
final QpidActivationSpec spec) throws ResourceException
{
- if (_log.isTraceEnabled())
- {
- _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
- }
-
- this._ra = ra;
- this._endpointFactory = endpointFactory;
- this._spec = spec;
- try
- {
- _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
- }
- catch (Exception e)
- {
- throw new ResourceException(e);
- }
- }
-
- /**
- * Get the activation spec
- *
- * @return The value
- */
- public QpidActivationSpec getActivationSpec()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getActivationSpec()");
- }
-
- return _spec;
- }
-
- /**
- * Get the message endpoint factory
- *
- * @return The value
- */
- public MessageEndpointFactory getMessageEndpointFactory()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getMessageEndpointFactory()");
- }
-
- return _endpointFactory;
- }
-
- /**
- * Get whether delivery is transacted
- *
- * @return The value
- */
- public boolean isDeliveryTransacted()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("isDeliveryTransacted()");
- }
-
- return _isDeliveryTransacted;
- }
-
- /**
- * Get the work manager
- *
- * @return The value
- */
- public WorkManager getWorkManager()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("getWorkManager()");
- }
-
- return _ra.getWorkManager();
- }
-
- /**
- * Is the destination a topic
- *
- * @return The value
- */
- public boolean isTopic()
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("isTopic()");
- }
-
- return _isTopic;
+ super(ra, spec, endpointFactory);
+
}
/**
@@ -267,70 +92,62 @@ public class QpidActivation implements ExceptionListener
*
* @throws Exception Thrown if an error occurs
*/
- protected synchronized void setup() throws Exception
+ public synchronized void setup() throws Exception
{
_log.debug("Setting up " + _spec);
- setupCF();
-
+ setupCF();
setupDestination();
- final AMQConnection amqConnection ;
- final boolean useLocalTx = _spec.isUseLocalTx() ;
- final boolean isXA = _isDeliveryTransacted && !useLocalTx ;
-
- if (isXA)
+
+ if(!_spec.isUseConnectionPerHandler())
{
- amqConnection = (XAConnectionImpl)_factory.createXAConnection() ;
+ setupConnection();
+ _connection.setExceptionListener(this);
}
- else
- {
- amqConnection = (AMQConnection)_factory.createConnection() ;
- }
-
- amqConnection.setExceptionListener(this) ;
-
+
for (int i = 0; i < _spec.getMaxSession(); i++)
{
- Session session = null;
-
- try
- {
- if (isXA)
- {
- session = _ra.createXASession((XAConnectionImpl)amqConnection) ;
- }
- else
- {
- session = _ra.createSession((AMQConnection)amqConnection,
- _spec.getAcknowledgeModeInt(),
- useLocalTx,
- _spec.getPrefetchLow(),
- _spec.getPrefetchHigh());
- }
-
- _log.debug("Using session " + Util.asString(session));
- QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session);
- handler.setup();
- _handlers.add(handler);
- }
- catch (Exception e)
- {
- try
- {
- amqConnection.close() ;
- }
- catch (Exception e2)
- {
- _log.trace("Ignored error closing connection", e2);
- }
-
- throw e;
- }
+ try
+ {
+ QpidMessageHandler handler = null;
+
+ if(_spec.isUseConnectionPerHandler())
+ {
+ handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM());
+ }
+ else
+ {
+ handler = new QpidMessageHandler(_ra, _spec, _endpointFactory, _ra.getTM(), _connection);
+ }
+
+ handler.start();
+ _handlers.add(handler);
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ if(_connection != null)
+ {
+ this._connection.close();
+ }
+ }
+ catch (Exception e2)
+ {
+ _log.trace("Ignored error closing connection", e2);
+ }
+
+ throw e;
+
+ }
+
+ }
+
+ if(!_spec.isUseConnectionPerHandler())
+ {
+ this._connection.start();
+ _activated.set(true);
}
- amqConnection.start() ;
- this._connection = amqConnection ;
- _activated.set(true);
-
_log.debug("Setup complete " + this);
}
@@ -340,136 +157,17 @@ public class QpidActivation implements ExceptionListener
protected synchronized void teardown()
{
_log.debug("Tearing down " + _spec);
-
- try
- {
- if (_connection != null)
- {
- _connection.stop();
- }
- }
- catch (Throwable t)
- {
- _log.debug("Error stopping connection " + Util.asString(_connection), t);
- }
+
+ super.teardown();
for (QpidMessageHandler handler : _handlers)
{
- handler.teardown();
+ handler.stop();
}
- try
- {
- if (_connection != null)
- {
- _connection.close();
- }
- }
- catch (Throwable t)
- {
- _log.debug("Error closing connection " + Util.asString(_connection), t);
- }
- if (_spec.isHasBeenUpdated())
- {
- _factory = null;
- }
_log.debug("Tearing down complete " + this);
}
- protected void setupCF() throws Exception
- {
- if (_spec.isHasBeenUpdated())
- {
- _factory = _ra.createAMQConnectionFactory(_spec);
- }
- else
- {
- _factory = _ra.getDefaultAMQConnectionFactory();
- }
- }
-
- public Destination getDestination()
- {
- return _destination;
- }
-
- protected void setupDestination() throws Exception
- {
-
- String destinationName = _spec.getDestination();
- String destinationTypeString = _spec.getDestinationType();
-
- if (_spec.isUseJNDI())
- {
- Context ctx = new InitialContext();
- _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
- if (_log.isTraceEnabled())
- {
- _log.trace("setupDestination(" + ctx + ")");
- }
-
- if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
- {
- _log.debug("Destination type defined as " + destinationTypeString);
-
- Class<? extends Destination> destinationType;
- if (Topic.class.getName().equals(destinationTypeString))
- {
- destinationType = Topic.class;
- _isTopic = true;
- }
- else
- {
- destinationType = Queue.class;
- }
-
- _log.debug("Retrieving destination " + destinationName +
- " of type " +
- destinationType.getName());
- _destination = Util.lookup(ctx, destinationName, destinationType);
-
- }
- else
- {
- _log.debug("Destination type not defined");
- _log.debug("Retrieving destination " + destinationName +
- " of type " +
- Destination.class.getName());
-
- _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
- _isTopic = !(_destination instanceof Queue) ;
- }
- }
- else
- {
- _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
- if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
- {
- _log.debug("Destination type defined as " + destinationTypeString);
- final boolean match ;
- if (Topic.class.getName().equals(destinationTypeString))
- {
- match = (_destination instanceof Topic) ;
- _isTopic = true;
- }
- else
- {
- match = (_destination instanceof Queue) ;
- }
- if (!match)
- {
- throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
- }
- }
- else
- {
- _isTopic = !(_destination instanceof Queue) ;
- }
- }
-
- _log.debug("Got destination " + _destination + " from " + destinationName);
- }
-
/**
* Get a string representation
*
@@ -492,94 +190,7 @@ public class QpidActivation implements ExceptionListener
return buffer.toString();
}
- public void onException(final JMSException jmse)
- {
- if(_activated.get())
- {
- handleFailure(jmse) ;
- }
- else
- {
- _log.warn("Received JMSException: " + jmse + " while endpoint was not activated.");
- }
- }
-
- /**
- * Handles any failure by trying to reconnect
- *
- * @param failure the reason for the failure
- */
- public void handleFailure(Throwable failure)
- {
- if(doesNotExist(failure))
- {
- _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
- }
- else
- {
- _log.warn("Failure in Qpid activation " + _spec, failure);
- }
- int reconnectCount = 0;
- int setupAttempts = _spec.getSetupAttempts();
- long setupInterval = _spec.getSetupInterval();
-
- // Only enter the failure loop once
- if (_inFailure.getAndSet(true))
- return;
- try
- {
- while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
- {
- teardown();
-
- try
- {
- Thread.sleep(setupInterval);
- }
- catch (InterruptedException e)
- {
- _log.debug("Interrupted trying to reconnect " + _spec, e);
- break;
- }
-
- _log.info("Attempting to reconnect " + _spec);
- try
- {
- setup();
- _log.info("Reconnected with Qpid");
- break;
- }
- catch (Throwable t)
- {
- if(doesNotExist(failure))
- {
- _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
- }
- else
- {
- _log.error("Unable to reconnect " + _spec, t);
- }
- }
- ++reconnectCount;
- }
- }
- finally
- {
- // Leaving failure recovery loop
- _inFailure.set(false);
- }
- }
-
- /**
- * Check to see if the failure represents a missing endpoint
- * @param failure The failure.
- * @return true if it represents a missing endpoint, false otherwise
- */
- private boolean doesNotExist(final Throwable failure)
- {
- return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
- }
-
+
/**
* Handles the setup
*/
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
index 5f4e2dcf6b..3d9a88adb5 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
@@ -28,10 +28,10 @@ import javax.resource.spi.ActivationSpec;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.qpid.ra.ConnectionFactoryProperties;
import org.apache.qpid.ra.QpidResourceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The activation spec
@@ -88,6 +88,8 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
// undefined by default, default is specified at the RA level in QpidRAProperties
private Long _setupInterval;
+ private Boolean _useConnectionPerHandler;
+
/**
* Constructor
*/
@@ -544,6 +546,16 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
this._setupInterval = setupInterval;
}
+ public Boolean isUseConnectionPerHandler()
+ {
+ return (_useConnectionPerHandler == null) ? _ra.isUseConnectionPerHandler() : _useConnectionPerHandler;
+ }
+
+ public void setUseConnectionPerHandler(Boolean connectionPerHandler)
+ {
+ this._useConnectionPerHandler = connectionPerHandler;
+ }
+
/**
* Validate
* @exception InvalidPropertyException Thrown if a validation exception occurs
@@ -561,6 +573,7 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
}
}
+
/**
* Get a string representation
* @return The value
@@ -573,23 +586,30 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
buffer.append("ra=").append(_ra);
buffer.append(" destination=").append(_destination);
buffer.append(" destinationType=").append(_destinationType);
+
if (_messageSelector != null)
{
buffer.append(" selector=").append(_messageSelector);
}
+
buffer.append(" ack=").append(getAcknowledgeMode());
buffer.append(" durable=").append(_subscriptionDurability);
buffer.append(" clientID=").append(getClientId());
+
if (_subscriptionName != null)
{
buffer.append(" subscription=").append(_subscriptionName);
}
+
buffer.append(" user=").append(getUserName());
+
if (getPassword() != null)
{
- buffer.append(" password=").append("****");
+ buffer.append(" password=").append("********");
}
+
buffer.append(" maxSession=").append(_maxSession);
+
if (_prefetchLow != null)
{
buffer.append(" prefetchLow=").append(_prefetchLow);
@@ -598,7 +618,10 @@ public class QpidActivationSpec extends ConnectionFactoryProperties implements A
{
buffer.append(" prefetchHigh=").append(_prefetchHigh);
}
+
+ buffer.append(" connectionPerHandler=").append(isUseConnectionPerHandler());
buffer.append(')');
+
return buffer.toString();
}
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
new file mode 100644
index 0000000000..8775362eb5
--- /dev/null
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
@@ -0,0 +1,339 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.jms.XAConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ra.QpidResourceAdapter;
+import org.apache.qpid.ra.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class QpidExceptionHandler implements ExceptionListener
+{
+ private static final Logger _log = LoggerFactory.getLogger(QpidExceptionHandler.class);
+
+ public static final Method ONMESSAGE;
+
+ protected final MessageEndpointFactory _endpointFactory;
+
+ protected Connection _connection;
+
+ protected ConnectionFactory _factory;
+
+ protected Destination _destination;
+
+ protected final QpidResourceAdapter _ra;
+
+ protected final QpidActivationSpec _spec;
+
+ protected boolean _isDeliveryTransacted;
+
+ protected final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
+
+ protected boolean _isTopic = false;
+
+ // Whether we are in the failure recovery loop
+ protected AtomicBoolean _inFailure = new AtomicBoolean(false);
+
+ //Whether or not we have completed activating
+ protected AtomicBoolean _activated = new AtomicBoolean(false);
+
+ static
+ {
+ try
+ {
+ ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public abstract void setup() throws Exception;
+ public abstract void start() throws Exception;
+ public abstract void stop();
+
+ protected QpidExceptionHandler(QpidResourceAdapter ra,
+ QpidActivationSpec spec,
+ MessageEndpointFactory endpointFactory) throws ResourceException
+ {
+ this._ra = ra;
+ this._spec = spec;
+ this._endpointFactory = endpointFactory;
+
+ try
+ {
+ _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
+ }
+ catch (Exception e)
+ {
+ throw new ResourceException(e);
+ }
+
+
+ }
+
+ public void onException(JMSException e)
+ {
+ if(_activated.get())
+ {
+ handleFailure(e) ;
+ }
+ else
+ {
+ _log.warn("Received JMSException: " + e + " while endpoint was not activated.");
+ }
+ }
+
+ /**
+ * Handles any failure by trying to reconnect
+ *
+ * @param failure the reason for the failure
+ */
+ public void handleFailure(Throwable failure)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + _spec.getDestination());
+ }
+ else
+ {
+ _log.warn("Failure in Qpid activation " + _spec, failure);
+ }
+ int reconnectCount = 0;
+ int setupAttempts = _spec.getSetupAttempts();
+ long setupInterval = _spec.getSetupInterval();
+
+ // Only enter the failure loop once
+ if (_inFailure.getAndSet(true))
+ return;
+ try
+ {
+ while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
+ {
+ teardown();
+
+ try
+ {
+ Thread.sleep(setupInterval);
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted trying to reconnect " + _spec, e);
+ break;
+ }
+
+ _log.info("Attempting to reconnect " + _spec);
+ try
+ {
+ setup();
+ _log.info("Reconnected with Qpid");
+ break;
+ }
+ catch (Throwable t)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + _spec.getDestination());
+ }
+ else
+ {
+ _log.error("Unable to reconnect " + _spec, t);
+ }
+ }
+ ++reconnectCount;
+ }
+ }
+ finally
+ {
+ // Leaving failure recovery loop
+ _inFailure.set(false);
+ }
+ }
+
+ /**
+ * Check to see if the failure represents a missing endpoint
+ * @param failure The failure.
+ * @return true if it represents a missing endpoint, false otherwise
+ */
+ protected boolean doesNotExist(final Throwable failure)
+ {
+ return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
+ }
+
+ protected boolean isXA()
+ {
+ return _isDeliveryTransacted && !_spec.isUseLocalTx();
+ }
+
+ protected void setupConnection() throws Exception
+ {
+ this._connection = (isXA()) ? ((XAConnectionFactory)_factory).createXAConnection() : _factory.createConnection();
+ }
+
+ protected synchronized void teardown()
+ {
+ _log.debug("Tearing down " + _spec);
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error stopping connection " + Util.asString(_connection), t);
+ }
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error closing connection " + Util.asString(_connection), t);
+ }
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = null;
+ }
+ _log.debug("Tearing down complete " + this);
+ }
+
+ protected void setupCF() throws Exception
+ {
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = _ra.createAMQConnectionFactory(_spec);
+ }
+ else
+ {
+ _factory = _ra.getDefaultAMQConnectionFactory();
+ }
+ }
+
+ protected void setupDestination() throws Exception
+ {
+
+ String destinationName = _spec.getDestination();
+ String destinationTypeString = _spec.getDestinationType();
+
+ if (_spec.isUseJNDI())
+ {
+ Context ctx = new InitialContext();
+ _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setupDestination(" + ctx + ")");
+ }
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+
+ Class<? extends Destination> destinationType;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ destinationType = Topic.class;
+ _isTopic = true;
+ }
+ else
+ {
+ destinationType = Queue.class;
+ }
+
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ destinationType.getName());
+ _destination = Util.lookup(ctx, destinationName, destinationType);
+
+ }
+ else
+ {
+ _log.debug("Destination type not defined");
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ Destination.class.getName());
+
+ _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+ else
+ {
+ _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+ final boolean match ;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ match = (_destination instanceof Topic) ;
+ _isTopic = true;
+ }
+ else
+ {
+ match = (_destination instanceof Queue) ;
+ }
+ if (!match)
+ {
+ throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
+ }
+ }
+ else
+ {
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+
+ _log.debug("Got destination " + _destination + " from " + destinationName);
+ }
+
+
+
+}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
index 473efab31f..a02adf0dad 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.ra.inflow;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -35,6 +36,9 @@ import javax.transaction.Status;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.QpidResourceAdapter;
import org.apache.qpid.ra.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory;
* The message handler
*
*/
-public class QpidMessageHandler implements MessageListener
+public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener
{
- /**
- * The logger
- */
private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
- /**
- * The session
- */
- private final Session _session;
-
private MessageConsumer _consumer;
- /**
- * The endpoint
- */
private MessageEndpoint _endpoint;
- private final QpidActivation _activation;
-
- private boolean _useLocalTx;
-
- private boolean _transacted;
+ private Session _session;
private final TransactionManager _tm;
- public QpidMessageHandler(final QpidActivation activation,
- final TransactionManager tm,
- final Session session)
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm,
+ final Connection connection) throws ResourceException
{
- this._activation = activation;
- this._session = session;
+ super(ra, spec, endpointFactory);
this._tm = tm;
+ this._connection = connection;
}
-
+
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm) throws ResourceException
+ {
+ super(ra, spec, endpointFactory);
+ this._tm = tm;
+ }
+
public void setup() throws Exception
{
if (_log.isTraceEnabled())
{
_log.trace("setup()");
}
-
- QpidActivationSpec spec = _activation.getActivationSpec();
- String selector = spec.getMessageSelector();
-
+
+ setupCF();
+ setupDestination();
+ String selector = _spec.getMessageSelector();
+
+ if(_spec.isUseConnectionPerHandler())
+ {
+ setupConnection();
+ _connection.setExceptionListener(this);
+ }
+
+ if(isXA())
+ {
+ _session = _ra.createXASession((XAConnectionImpl)_connection);
+ }
+ else
+ {
+ _session = _ra.createSession((AMQConnection)_connection,
+ _spec.getAcknowledgeModeInt(),
+ _spec.isUseLocalTx(),
+ _spec.getPrefetchLow(),
+ _spec.getPrefetchHigh());
+ }
// Create the message consumer
- if (_activation.isTopic())
+ if (_isTopic)
{
- final Topic topic = (Topic) _activation.getDestination();
- final String subscriptionName = spec.getSubscriptionName();
- if (spec.isSubscriptionDurable())
- _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ final Topic topic = (Topic) _destination;
+ final String subscriptionName = _spec.getSubscriptionName();
+
+ if (_spec.isSubscriptionDurable())
+ {
+ _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ }
else
- _consumer = _session.createConsumer(topic, selector) ;
+ {
+ _consumer = _session.createConsumer(topic, selector) ;
+ }
}
else
{
- final Queue queue = (Queue) _activation.getDestination();
+ final Queue queue = (Queue) _destination;
_consumer = _session.createConsumer(queue, selector);
}
- // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
- MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory();
- _useLocalTx = _activation.getActivationSpec().isUseLocalTx();
- _transacted = _activation.isDeliveryTransacted() || _useLocalTx ;
- if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx())
+ if (isXA())
{
final XAResource xaResource = ((XASession)_session).getXAResource() ;
- _endpoint = endpointFactory.createEndpoint(xaResource);
+ _endpoint = _endpointFactory.createEndpoint(xaResource);
}
else
{
- _endpoint = endpointFactory.createEndpoint(null);
+ _endpoint = _endpointFactory.createEndpoint(null);
}
_consumer.setMessageListener(this);
+ _connection.start();
+ _activated.set(true);
}
/**
@@ -126,11 +148,13 @@ public class QpidMessageHandler implements MessageListener
*/
public void teardown()
{
- if (_log.isTraceEnabled())
- {
- _log.trace("teardown()");
- }
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("teardown()");
+ }
+ super.teardown();
+
try
{
if (_endpoint != null)
@@ -156,27 +180,28 @@ public class QpidMessageHandler implements MessageListener
try
{
- if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null)
+ if (_spec.getTransactionTimeout() > 0 && _tm != null)
{
- _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout());
+ _tm.setTransactionTimeout(_spec.getTransactionTimeout());
}
_endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
beforeDelivery = true;
- if(_transacted)
+ if(isXA())
{
message.acknowledge();
}
((MessageListener)_endpoint).onMessage(message);
- if (_transacted && (_tm.getTransaction() != null))
+ if (isXA() && (_tm.getTransaction() != null))
{
final int status = _tm.getStatus() ;
final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK
|| status == Status.STATUS_ROLLING_BACK
|| status == Status.STATUS_ROLLEDBACK;
+
if (rollback)
{
_session.recover() ;
@@ -196,7 +221,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
return;
}
- if (_useLocalTx)
+ if (!isXA() && _spec.isUseLocalTx())
{
_session.commit();
}
@@ -216,7 +241,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
}
}
- if (_useLocalTx || !_activation.isDeliveryTransacted())
+ if (!isXA() && _spec.isUseLocalTx())
{
try
{
@@ -241,5 +266,17 @@ public class QpidMessageHandler implements MessageListener
}
}
+
+ public void start() throws Exception
+ {
+ _deliveryActive.set(true);
+ setup();
+ }
+
+ public void stop()
+ {
+ _deliveryActive.set(false);
+ teardown();
+ }
}
diff --git a/qpid/java/jca/src/main/resources/META-INF/ra.xml b/qpid/java/jca/src/main/resources/META-INF/ra.xml
index 01f5eceecf..a9374f52d7 100755
--- a/qpid/java/jca/src/main/resources/META-INF/ra.xml
+++ b/qpid/java/jca/src/main/resources/META-INF/ra.xml
@@ -109,6 +109,13 @@
<config-property-type>java.lang.String</config-property-type>
<config-property-value>amqp://anonymous:passwd@client/test?brokerlist='tcp://localhost?sasl_mechs='PLAIN''</config-property-value>
</config-property>
+
+ <config-property>
+ <description>Use a JMS Connection per MessageHandler</description>
+ <config-property-name>UseConnectionPerHandler</config-property-name>
+ <config-property-type>java.lang.Boolean</config-property-type>
+ <config-property-value>true</config-property-value>
+ </config-property>
<outbound-resourceadapter>
<connection-definition>
diff --git a/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java b/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java
new file mode 100644
index 0000000000..e811427223
--- /dev/null
+++ b/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidActivationSpecTest.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import javax.resource.spi.ResourceAdapterInternalException;
+
+import org.apache.qpid.ra.inflow.QpidActivationSpec;
+
+import junit.framework.TestCase;
+
+public class QpidActivationSpecTest extends TestCase
+{
+
+ public void testActivationSpecBasicSerialization() throws Exception
+ {
+ QpidActivationSpec spec = new QpidActivationSpec();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(spec);
+ oos.close();
+ assertTrue(out.toByteArray().length > 0);
+ }
+
+}
diff --git a/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java b/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java
index ccad952d64..a6788a72c5 100644
--- a/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java
+++ b/qpid/java/jca/src/test/java/org/apache/qpid/ra/QpidResourceAdapterTest.java
@@ -22,6 +22,9 @@ package org.apache.qpid.ra;
import javax.resource.spi.ResourceAdapterInternalException;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
import junit.framework.TestCase;
@@ -62,4 +65,14 @@ public class QpidResourceAdapterTest extends TestCase
}
+ public void testResourceAdapterBasicSerialization() throws Exception
+ {
+
+ QpidResourceAdapter ra = new QpidResourceAdapter();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(ra);
+ oos.close();
+ assertTrue(out.toByteArray().length > 0);
+ }
}