summaryrefslogtreecommitdiff
path: root/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java')
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java139
1 files changed, 88 insertions, 51 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
index 473efab31f..a02adf0dad 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.ra.inflow;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -35,6 +36,9 @@ import javax.transaction.Status;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.QpidResourceAdapter;
import org.apache.qpid.ra.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory;
* The message handler
*
*/
-public class QpidMessageHandler implements MessageListener
+public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener
{
- /**
- * The logger
- */
private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
- /**
- * The session
- */
- private final Session _session;
-
private MessageConsumer _consumer;
- /**
- * The endpoint
- */
private MessageEndpoint _endpoint;
- private final QpidActivation _activation;
-
- private boolean _useLocalTx;
-
- private boolean _transacted;
+ private Session _session;
private final TransactionManager _tm;
- public QpidMessageHandler(final QpidActivation activation,
- final TransactionManager tm,
- final Session session)
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm,
+ final Connection connection) throws ResourceException
{
- this._activation = activation;
- this._session = session;
+ super(ra, spec, endpointFactory);
this._tm = tm;
+ this._connection = connection;
}
-
+
+ public QpidMessageHandler(final QpidResourceAdapter ra,
+ final QpidActivationSpec spec,
+ final MessageEndpointFactory endpointFactory,
+ final TransactionManager tm) throws ResourceException
+ {
+ super(ra, spec, endpointFactory);
+ this._tm = tm;
+ }
+
public void setup() throws Exception
{
if (_log.isTraceEnabled())
{
_log.trace("setup()");
}
-
- QpidActivationSpec spec = _activation.getActivationSpec();
- String selector = spec.getMessageSelector();
-
+
+ setupCF();
+ setupDestination();
+ String selector = _spec.getMessageSelector();
+
+ if(_spec.isUseConnectionPerHandler())
+ {
+ setupConnection();
+ _connection.setExceptionListener(this);
+ }
+
+ if(isXA())
+ {
+ _session = _ra.createXASession((XAConnectionImpl)_connection);
+ }
+ else
+ {
+ _session = _ra.createSession((AMQConnection)_connection,
+ _spec.getAcknowledgeModeInt(),
+ _spec.isUseLocalTx(),
+ _spec.getPrefetchLow(),
+ _spec.getPrefetchHigh());
+ }
// Create the message consumer
- if (_activation.isTopic())
+ if (_isTopic)
{
- final Topic topic = (Topic) _activation.getDestination();
- final String subscriptionName = spec.getSubscriptionName();
- if (spec.isSubscriptionDurable())
- _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ final Topic topic = (Topic) _destination;
+ final String subscriptionName = _spec.getSubscriptionName();
+
+ if (_spec.isSubscriptionDurable())
+ {
+ _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ }
else
- _consumer = _session.createConsumer(topic, selector) ;
+ {
+ _consumer = _session.createConsumer(topic, selector) ;
+ }
}
else
{
- final Queue queue = (Queue) _activation.getDestination();
+ final Queue queue = (Queue) _destination;
_consumer = _session.createConsumer(queue, selector);
}
- // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
- MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory();
- _useLocalTx = _activation.getActivationSpec().isUseLocalTx();
- _transacted = _activation.isDeliveryTransacted() || _useLocalTx ;
- if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx())
+ if (isXA())
{
final XAResource xaResource = ((XASession)_session).getXAResource() ;
- _endpoint = endpointFactory.createEndpoint(xaResource);
+ _endpoint = _endpointFactory.createEndpoint(xaResource);
}
else
{
- _endpoint = endpointFactory.createEndpoint(null);
+ _endpoint = _endpointFactory.createEndpoint(null);
}
_consumer.setMessageListener(this);
+ _connection.start();
+ _activated.set(true);
}
/**
@@ -126,11 +148,13 @@ public class QpidMessageHandler implements MessageListener
*/
public void teardown()
{
- if (_log.isTraceEnabled())
- {
- _log.trace("teardown()");
- }
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("teardown()");
+ }
+ super.teardown();
+
try
{
if (_endpoint != null)
@@ -156,27 +180,28 @@ public class QpidMessageHandler implements MessageListener
try
{
- if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null)
+ if (_spec.getTransactionTimeout() > 0 && _tm != null)
{
- _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout());
+ _tm.setTransactionTimeout(_spec.getTransactionTimeout());
}
_endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
beforeDelivery = true;
- if(_transacted)
+ if(isXA())
{
message.acknowledge();
}
((MessageListener)_endpoint).onMessage(message);
- if (_transacted && (_tm.getTransaction() != null))
+ if (isXA() && (_tm.getTransaction() != null))
{
final int status = _tm.getStatus() ;
final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK
|| status == Status.STATUS_ROLLING_BACK
|| status == Status.STATUS_ROLLEDBACK;
+
if (rollback)
{
_session.recover() ;
@@ -196,7 +221,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
return;
}
- if (_useLocalTx)
+ if (!isXA() && _spec.isUseLocalTx())
{
_session.commit();
}
@@ -216,7 +241,7 @@ public class QpidMessageHandler implements MessageListener
_log.warn("Unable to call after delivery", e);
}
}
- if (_useLocalTx || !_activation.isDeliveryTransacted())
+ if (!isXA() && _spec.isUseLocalTx())
{
try
{
@@ -241,5 +266,17 @@ public class QpidMessageHandler implements MessageListener
}
}
+
+ public void start() throws Exception
+ {
+ _deliveryActive.set(true);
+ setup();
+ }
+
+ public void stop()
+ {
+ _deliveryActive.set(false);
+ teardown();
+ }
}