diff options
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java')
-rw-r--r-- | qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java | 139 |
1 files changed, 88 insertions, 51 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java index 473efab31f..a02adf0dad 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.ra.inflow; +import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -35,6 +36,9 @@ import javax.transaction.Status; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.XAConnectionImpl; +import org.apache.qpid.ra.QpidResourceAdapter; import org.apache.qpid.ra.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,82 +47,100 @@ import org.slf4j.LoggerFactory; * The message handler * */ -public class QpidMessageHandler implements MessageListener +public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener { - /** - * The logger - */ private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class); - /** - * The session - */ - private final Session _session; - private MessageConsumer _consumer; - /** - * The endpoint - */ private MessageEndpoint _endpoint; - private final QpidActivation _activation; - - private boolean _useLocalTx; - - private boolean _transacted; + private Session _session; private final TransactionManager _tm; - public QpidMessageHandler(final QpidActivation activation, - final TransactionManager tm, - final Session session) + public QpidMessageHandler(final QpidResourceAdapter ra, + final QpidActivationSpec spec, + final MessageEndpointFactory endpointFactory, + final TransactionManager tm, + final Connection connection) throws ResourceException { - this._activation = activation; - this._session = session; + super(ra, spec, endpointFactory); this._tm = tm; + this._connection = connection; } - + + public QpidMessageHandler(final QpidResourceAdapter ra, + final QpidActivationSpec spec, + final MessageEndpointFactory endpointFactory, + final TransactionManager tm) throws ResourceException + { + super(ra, spec, endpointFactory); + this._tm = tm; + } + public void setup() throws Exception { if (_log.isTraceEnabled()) { _log.trace("setup()"); } - - QpidActivationSpec spec = _activation.getActivationSpec(); - String selector = spec.getMessageSelector(); - + + setupCF(); + setupDestination(); + String selector = _spec.getMessageSelector(); + + if(_spec.isUseConnectionPerHandler()) + { + setupConnection(); + _connection.setExceptionListener(this); + } + + if(isXA()) + { + _session = _ra.createXASession((XAConnectionImpl)_connection); + } + else + { + _session = _ra.createSession((AMQConnection)_connection, + _spec.getAcknowledgeModeInt(), + _spec.isUseLocalTx(), + _spec.getPrefetchLow(), + _spec.getPrefetchHigh()); + } // Create the message consumer - if (_activation.isTopic()) + if (_isTopic) { - final Topic topic = (Topic) _activation.getDestination(); - final String subscriptionName = spec.getSubscriptionName(); - if (spec.isSubscriptionDurable()) - _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false); + final Topic topic = (Topic) _destination; + final String subscriptionName = _spec.getSubscriptionName(); + + if (_spec.isSubscriptionDurable()) + { + _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false); + } else - _consumer = _session.createConsumer(topic, selector) ; + { + _consumer = _session.createConsumer(topic, selector) ; + } } else { - final Queue queue = (Queue) _activation.getDestination(); + final Queue queue = (Queue) _destination; _consumer = _session.createConsumer(queue, selector); } - // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX - MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory(); - _useLocalTx = _activation.getActivationSpec().isUseLocalTx(); - _transacted = _activation.isDeliveryTransacted() || _useLocalTx ; - if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx()) + if (isXA()) { final XAResource xaResource = ((XASession)_session).getXAResource() ; - _endpoint = endpointFactory.createEndpoint(xaResource); + _endpoint = _endpointFactory.createEndpoint(xaResource); } else { - _endpoint = endpointFactory.createEndpoint(null); + _endpoint = _endpointFactory.createEndpoint(null); } _consumer.setMessageListener(this); + _connection.start(); + _activated.set(true); } /** @@ -126,11 +148,13 @@ public class QpidMessageHandler implements MessageListener */ public void teardown() { - if (_log.isTraceEnabled()) - { - _log.trace("teardown()"); - } + if (_log.isTraceEnabled()) + { + _log.trace("teardown()"); + } + super.teardown(); + try { if (_endpoint != null) @@ -156,27 +180,28 @@ public class QpidMessageHandler implements MessageListener try { - if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null) + if (_spec.getTransactionTimeout() > 0 && _tm != null) { - _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout()); + _tm.setTransactionTimeout(_spec.getTransactionTimeout()); } _endpoint.beforeDelivery(QpidActivation.ONMESSAGE); beforeDelivery = true; - if(_transacted) + if(isXA()) { message.acknowledge(); } ((MessageListener)_endpoint).onMessage(message); - if (_transacted && (_tm.getTransaction() != null)) + if (isXA() && (_tm.getTransaction() != null)) { final int status = _tm.getStatus() ; final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK || status == Status.STATUS_ROLLING_BACK || status == Status.STATUS_ROLLEDBACK; + if (rollback) { _session.recover() ; @@ -196,7 +221,7 @@ public class QpidMessageHandler implements MessageListener _log.warn("Unable to call after delivery", e); return; } - if (_useLocalTx) + if (!isXA() && _spec.isUseLocalTx()) { _session.commit(); } @@ -216,7 +241,7 @@ public class QpidMessageHandler implements MessageListener _log.warn("Unable to call after delivery", e); } } - if (_useLocalTx || !_activation.isDeliveryTransacted()) + if (!isXA() && _spec.isUseLocalTx()) { try { @@ -241,5 +266,17 @@ public class QpidMessageHandler implements MessageListener } } + + public void start() throws Exception + { + _deliveryActive.set(true); + setup(); + } + + public void stop() + { + _deliveryActive.set(false); + teardown(); + } } |