summaryrefslogtreecommitdiff
path: root/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java')
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java593
1 files changed, 593 insertions, 0 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
new file mode 100644
index 0000000000..98427d7f9d
--- /dev/null
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
@@ -0,0 +1,593 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ra.QpidResourceAdapter;
+import org.apache.qpid.ra.Util;
+
+/**
+ * The activation.
+ *
+ */
+public class QpidActivation implements ExceptionListener
+{
+ /**
+ * The logger
+ */
+ private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class);
+
+ /**
+ * The onMessage method
+ */
+ public static final Method ONMESSAGE;
+
+ /**
+ * The resource adapter
+ */
+ private final QpidResourceAdapter _ra;
+
+ /**
+ * The activation spec
+ */
+ private final QpidActivationSpec _spec;
+
+ /**
+ * The message endpoint factory
+ */
+ private final MessageEndpointFactory _endpointFactory;
+
+ /**
+ * Whether delivery is active
+ */
+ private final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
+
+ /**
+ * The destination type
+ */
+ private boolean _isTopic = false;
+
+ /**
+ * Is the delivery transacted
+ */
+ private boolean _isDeliveryTransacted;
+
+ private Destination _destination;
+
+ /**
+ * The connection
+ */
+ private Connection _connection;
+
+ private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>();
+
+ private AMQConnectionFactory _factory;
+
+ // Whether we are in the failure recovery loop
+ private AtomicBoolean _inFailure = new AtomicBoolean(false);
+
+ static
+ {
+ try
+ {
+ ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param ra The resource adapter
+ * @param endpointFactory The endpoint factory
+ * @param spec The activation spec
+ * @throws ResourceException Thrown if an error occurs
+ */
+ public QpidActivation(final QpidResourceAdapter ra,
+ final MessageEndpointFactory endpointFactory,
+ final QpidActivationSpec spec) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
+ }
+
+ this._ra = ra;
+ this._endpointFactory = endpointFactory;
+ this._spec = spec;
+ try
+ {
+ _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
+ }
+ catch (Exception e)
+ {
+ throw new ResourceException(e);
+ }
+ }
+
+ /**
+ * Get the activation spec
+ *
+ * @return The value
+ */
+ public QpidActivationSpec getActivationSpec()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getActivationSpec()");
+ }
+
+ return _spec;
+ }
+
+ /**
+ * Get the message endpoint factory
+ *
+ * @return The value
+ */
+ public MessageEndpointFactory getMessageEndpointFactory()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageEndpointFactory()");
+ }
+
+ return _endpointFactory;
+ }
+
+ /**
+ * Get whether delivery is transacted
+ *
+ * @return The value
+ */
+ public boolean isDeliveryTransacted()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isDeliveryTransacted()");
+ }
+
+ return _isDeliveryTransacted;
+ }
+
+ /**
+ * Get the work manager
+ *
+ * @return The value
+ */
+ public WorkManager getWorkManager()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getWorkManager()");
+ }
+
+ return _ra.getWorkManager();
+ }
+
+ /**
+ * Is the destination a topic
+ *
+ * @return The value
+ */
+ public boolean isTopic()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isTopic()");
+ }
+
+ return _isTopic;
+ }
+
+ /**
+ * Start the activation
+ *
+ * @throws ResourceException Thrown if an error occurs
+ */
+ public void start() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start()");
+ }
+ _deliveryActive.set(true);
+ _ra.getWorkManager().scheduleWork(new SetupActivation());
+ }
+
+ /**
+ * Stop the activation
+ */
+ public void stop()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop()");
+ }
+
+ _deliveryActive.set(false);
+ teardown();
+ }
+
+ /**
+ * Setup the activation
+ *
+ * @throws Exception Thrown if an error occurs
+ */
+ protected synchronized void setup() throws Exception
+ {
+ _log.debug("Setting up " + _spec);
+ setupCF();
+
+ setupDestination();
+ final AMQConnection amqConnection ;
+ final boolean useLocalTx = _spec.isUseLocalTx() ;
+ final boolean isXA = _isDeliveryTransacted && !useLocalTx ;
+
+ if (isXA)
+ {
+ amqConnection = (XAConnectionImpl)_factory.createXAConnection() ;
+ }
+ else
+ {
+ amqConnection = (AMQConnection)_factory.createConnection() ;
+ }
+
+ amqConnection.setExceptionListener(this) ;
+
+ for (int i = 0; i < _spec.getMaxSession(); i++)
+ {
+ Session session = null;
+
+ try
+ {
+ if (isXA)
+ {
+ session = _ra.createXASession((XAConnectionImpl)amqConnection) ;
+ }
+ else
+ {
+ session = _ra.createSession((AMQConnection)amqConnection,
+ _spec.getAcknowledgeModeInt(),
+ useLocalTx,
+ _spec.getPrefetchLow(),
+ _spec.getPrefetchHigh());
+ }
+
+ _log.debug("Using session " + Util.asString(session));
+ QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session);
+ handler.setup();
+ _handlers.add(handler);
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ amqConnection.close() ;
+ }
+ catch (Exception e2)
+ {
+ _log.trace("Ignored error closing connection", e2);
+ }
+
+ throw e;
+ }
+ }
+ amqConnection.start() ;
+ this._connection = amqConnection ;
+
+ _log.debug("Setup complete " + this);
+ }
+
+ /**
+ * Teardown the activation
+ */
+ protected synchronized void teardown()
+ {
+ _log.debug("Tearing down " + _spec);
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error stopping connection " + Util.asString(_connection), t);
+ }
+
+ for (QpidMessageHandler handler : _handlers)
+ {
+ handler.teardown();
+ }
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error closing connection " + Util.asString(_connection), t);
+ }
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = null;
+ }
+ _log.debug("Tearing down complete " + this);
+ }
+
+ protected void setupCF() throws Exception
+ {
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = _ra.createAMQConnectionFactory(_spec);
+ }
+ else
+ {
+ _factory = _ra.getDefaultAMQConnectionFactory();
+ }
+ }
+
+ public Destination getDestination()
+ {
+ return _destination;
+ }
+
+ protected void setupDestination() throws Exception
+ {
+
+ String destinationName = _spec.getDestination();
+ String destinationTypeString = _spec.getDestinationType();
+
+ if (_spec.isUseJNDI())
+ {
+ Context ctx = new InitialContext();
+ _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setupDestination(" + ctx + ")");
+ }
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+
+ Class<? extends Destination> destinationType;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ destinationType = Topic.class;
+ _isTopic = true;
+ }
+ else
+ {
+ destinationType = Queue.class;
+ }
+
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ destinationType.getName());
+ _destination = Util.lookup(ctx, destinationName, destinationType);
+ //_destination = (Destination)ctx.lookup(destinationName);
+
+ }
+ else
+ {
+ _log.debug("Destination type not defined");
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ Destination.class.getName());
+
+ _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+ else
+ {
+ _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+ final boolean match ;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ match = (_destination instanceof Topic) ;
+ _isTopic = true;
+ }
+ else
+ {
+ match = (_destination instanceof Queue) ;
+ }
+ if (!match)
+ {
+ throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
+ }
+ }
+ else
+ {
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+
+ _log.debug("Got destination " + _destination + " from " + destinationName);
+ }
+
+ /**
+ * Get a string representation
+ *
+ * @return The value
+ */
+ @Override
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(QpidActivation.class.getName()).append('(');
+ buffer.append("spec=").append(_spec.getClass().getName());
+ buffer.append(" mepf=").append(_endpointFactory.getClass().getName());
+ buffer.append(" active=").append(_deliveryActive.get());
+ if (_spec.getDestination() != null)
+ {
+ buffer.append(" destination=").append(_spec.getDestination());
+ }
+ buffer.append(" transacted=").append(_isDeliveryTransacted);
+ buffer.append(')');
+ return buffer.toString();
+ }
+
+ public void onException(final JMSException jmse)
+ {
+ handleFailure(jmse) ;
+ }
+
+ /**
+ * Handles any failure by trying to reconnect
+ *
+ * @param failure the reason for the failure
+ */
+ public void handleFailure(Throwable failure)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
+ }
+ else
+ {
+ _log.warn("Failure in Qpid activation " + _spec, failure);
+ }
+ int reconnectCount = 0;
+ int setupAttempts = _spec.getSetupAttempts();
+ long setupInterval = _spec.getSetupInterval();
+
+ // Only enter the failure loop once
+ if (_inFailure.getAndSet(true))
+ return;
+ try
+ {
+ while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
+ {
+ teardown();
+
+ try
+ {
+ Thread.sleep(setupInterval);
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted trying to reconnect " + _spec, e);
+ break;
+ }
+
+ _log.info("Attempting to reconnect " + _spec);
+ try
+ {
+ setup();
+ _log.info("Reconnected with Qpid");
+ break;
+ }
+ catch (Throwable t)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
+ }
+ else
+ {
+ _log.error("Unable to reconnect " + _spec, t);
+ }
+ }
+ ++reconnectCount;
+ }
+ }
+ finally
+ {
+ // Leaving failure recovery loop
+ _inFailure.set(false);
+ }
+ }
+
+ /**
+ * Check to see if the failure represents a missing endpoint
+ * @param failure The failure.
+ * @return true if it represents a missing endpoint, false otherwise
+ */
+ private boolean doesNotExist(final Throwable failure)
+ {
+ return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
+ }
+
+ /**
+ * Handles the setup
+ */
+ private class SetupActivation implements Work
+ {
+ public void run()
+ {
+ try
+ {
+ setup();
+ }
+ catch (Throwable t)
+ {
+ handleFailure(t);
+ }
+ }
+
+ public void release()
+ {
+ }
+ }
+}