diff options
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java')
-rw-r--r-- | qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java | 880 |
1 files changed, 880 insertions, 0 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java new file mode 100644 index 0000000000..fb1b7f060c --- /dev/null +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java @@ -0,0 +1,880 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.ra; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.jms.QueueConnection; +import javax.jms.TopicConnection; +import javax.jms.XAQueueConnection; +import javax.jms.XASession; +import javax.jms.XATopicConnection; +import javax.resource.ResourceException; +import javax.resource.spi.ConnectionEvent; +import javax.resource.spi.ConnectionEventListener; +import javax.resource.spi.ConnectionRequestInfo; +import javax.resource.spi.IllegalStateException; +import javax.resource.spi.LocalTransaction; +import javax.resource.spi.ManagedConnection; +import javax.resource.spi.ManagedConnectionMetaData; +import javax.resource.spi.SecurityException; +import javax.security.auth.Subject; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The managed connection + * + */ +public class QpidRAManagedConnection implements ManagedConnection, ExceptionListener +{ + /** The logger */ + private static final Logger _log = LoggerFactory.getLogger(QpidRAManagedConnection.class); + + /** The managed connection factory */ + private final QpidRAManagedConnectionFactory _mcf; + + /** The connection request information */ + private final QpidRAConnectionRequestInfo _cri; + + /** The user name */ + private final String _userName; + + /** The password */ + private final String _password; + + /** Has the connection been destroyed */ + private final AtomicBoolean _isDestroyed = new AtomicBoolean(false); + + /** Event listeners */ + private final List<ConnectionEventListener> _eventListeners; + + /** Handles */ + private final Set<QpidRASessionImpl> _handles; + + /** Lock */ + private ReentrantLock _lock = new ReentrantLock(); + + // Physical JMS connection stuff + private Connection _connection; + + private XASession _xaSession; + + private XAResource _xaResource; + + private Session _session; + + private final TransactionManager _tm; + + private boolean _inManagedTx; + + /** + * Constructor + * @param mcf The managed connection factory + * @param cri The connection request information + * @param userName The user name + * @param password The password + */ + public QpidRAManagedConnection(final QpidRAManagedConnectionFactory mcf, + final QpidRAConnectionRequestInfo cri, + final TransactionManager tm, + final String userName, + final String password) throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("constructor(" + mcf + ", " + cri + ", " + userName + ", ****)"); + } + + this._mcf = mcf; + this._cri = cri; + this._tm = tm; + this._userName = userName; + this._password = password; + _eventListeners = Collections.synchronizedList(new ArrayList<ConnectionEventListener>()); + _handles = Collections.synchronizedSet(new HashSet<QpidRASessionImpl>()); + + try + { + setup(); + } + catch (Throwable t) + { + try + { + destroy(); + } + catch (Throwable ignored) + { + } + throw new ResourceException("Error during setup", t); + } + } + + /** + * Get a connection + * @param subject The security subject + * @param cxRequestInfo The request info + * @return The connection + * @exception ResourceException Thrown if an error occurs + */ + public synchronized Object getConnection(final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("getConnection(" + subject + ", " + cxRequestInfo + ")"); + } + + // Check user first + QpidRACredential credential = QpidRACredential.getCredential(_mcf, subject, cxRequestInfo); + + // Null users are allowed! + if (_userName != null && !_userName.equals(credential.getUserName())) + { + throw new SecurityException("Password credentials not the same, reauthentication not allowed"); + } + + if (_userName == null && credential.getUserName() != null) + { + throw new SecurityException("Password credentials not the same, reauthentication not allowed"); + } + + if (_isDestroyed.get()) + { + throw new IllegalStateException("The managed connection is already destroyed"); + } + + QpidRASessionImpl session = new QpidRASessionImpl(this, (QpidRAConnectionRequestInfo)cxRequestInfo); + _handles.add(session); + return session; + } + + /** + * Destroy all handles. + * @exception ResourceException Failed to close one or more handles. + */ + private void destroyHandles() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("destroyHandles()"); + } + + try + { + if (_connection != null) + { + _connection.stop(); + } + } + catch (Throwable t) + { + _log.trace("Ignored error stopping connection", t); + } + + for (QpidRASessionImpl session : _handles) + { + session.destroy(); + } + + _handles.clear(); + } + + /** + * Destroy the physical connection. + * @exception ResourceException Could not property close the session and connection. + */ + public void destroy() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("destroy()"); + } + + if (_isDestroyed.get() || _connection == null) + { + return; + } + + _isDestroyed.set(true); + + try + { + _connection.setExceptionListener(null); + } + catch (JMSException e) + { + _log.debug("Error unsetting the exception listener " + this, e); + } + + destroyHandles(); + + try + { + try + { + if (_xaSession != null) + { + _xaSession.close(); + } + } + catch (JMSException e) + { + _log.debug("Error closing session " + this, e); + } + + if (_connection != null) + { + _connection.close(); + } + } + catch (Throwable e) + { + throw new ResourceException("Could not properly close the session and connection", e); + } + } + + /** + * Cleanup + * @exception ResourceException Thrown if an error occurs + */ + public void cleanup() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("cleanup()"); + } + + if (_isDestroyed.get()) + { + throw new IllegalStateException("ManagedConnection already destroyed"); + } + + destroyHandles(); + + _inManagedTx = false; + + // I'm recreating the lock object when we return to the pool + // because it looks too nasty to expect the connection handle + // to unlock properly in certain race conditions + // where the dissociation of the managed connection is "random". + _lock = new ReentrantLock(); + } + + /** + * Move a handler from one mc to this one. + * @param obj An object of type QpidRASession. + * @throws ResourceException Failed to associate connection. + * @throws IllegalStateException ManagedConnection in an illegal state. + */ + public void associateConnection(final Object obj) throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("associateConnection(" + obj + ")"); + } + + if (!_isDestroyed.get() && obj instanceof QpidRASessionImpl) + { + QpidRASessionImpl h = (QpidRASessionImpl)obj; + h.setManagedConnection(this); + _handles.add(h); + } + else + { + throw new IllegalStateException("ManagedConnection in an illegal state"); + } + } + + public void checkTransactionActive() throws JMSException + { + // don't bother looking at the transaction if there's an active XID + if (!_inManagedTx && _tm != null) + { + try + { + Transaction tx = _tm.getTransaction(); + if (tx != null) + { + int status = tx.getStatus(); + // Only allow states that will actually succeed + if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING && + status != Status.STATUS_PREPARED && + status != Status.STATUS_COMMITTING) + { + throw new javax.jms.IllegalStateException("Transaction " + tx + " not active"); + } + } + } + catch (SystemException e) + { + JMSException jmsE = new javax.jms.IllegalStateException("Unexpected exception on the Transaction ManagerTransaction"); + jmsE.initCause(e); + throw jmsE; + } + } + } + + + /** + * Aqquire a lock on the managed connection + */ + protected void lock() + { + if (_log.isTraceEnabled()) + { + _log.trace("lock()"); + } + + _lock.lock(); + } + + /** + * Aqquire a lock on the managed connection within the specified period + * @exception JMSException Thrown if an error occurs + */ + protected void tryLock() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("tryLock()"); + } + + Integer tryLock = _mcf.getUseTryLock(); + if (tryLock == null || tryLock.intValue() <= 0) + { + lock(); + return; + } + try + { + if (_lock.tryLock(tryLock.intValue(), TimeUnit.SECONDS) == false) + { + throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this); + } + } + catch (InterruptedException e) + { + throw new ResourceAllocationException("Interrupted attempting lock: " + this); + } + } + + /** + * Unlock the managed connection + */ + protected void unlock() + { + if (_log.isTraceEnabled()) + { + _log.trace("unlock()"); + } + + if (_lock.isHeldByCurrentThread()) + { + _lock.unlock(); + } + } + + /** + * Add a connection event listener. + * @param l The connection event listener to be added. + */ + public void addConnectionEventListener(final ConnectionEventListener l) + { + if (_log.isTraceEnabled()) + { + _log.trace("addConnectionEventListener(" + l + ")"); + } + + _eventListeners.add(l); + } + + /** + * Remove a connection event listener. + * @param l The connection event listener to be removed. + */ + public void removeConnectionEventListener(final ConnectionEventListener l) + { + if (_log.isTraceEnabled()) + { + _log.trace("removeConnectionEventListener(" + l + ")"); + } + + _eventListeners.remove(l); + } + + /** + * Get the XAResource for the connection. + * @return The XAResource for the connection. + * @exception ResourceException XA transaction not supported + */ + public XAResource getXAResource() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("getXAResource()"); + } + + // + // Spec says a mc must allways return the same XA resource, + // so we cache it. + // + if (_xaResource == null) + { + _xaResource = new QpidRAXAResource(this, _xaSession.getXAResource()); + } + + if (_log.isTraceEnabled()) + { + _log.trace("XAResource=" + _xaResource); + } + + return _xaResource; + } + + /** + * Get the location transaction for the connection. + * @return The local transaction for the connection. + * @exception ResourceException Thrown if operation fails. + */ + public LocalTransaction getLocalTransaction() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("getLocalTransaction()"); + } + + LocalTransaction tx = new QpidRALocalTransaction(this); + + if (_log.isTraceEnabled()) + { + _log.trace("LocalTransaction=" + tx); + } + + return tx; + } + + /** + * Get the meta data for the connection. + * @return The meta data for the connection. + * @exception ResourceException Thrown if the operation fails. + * @exception IllegalStateException Thrown if the managed connection already is destroyed. + */ + public ManagedConnectionMetaData getMetaData() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("getMetaData()"); + } + + if (_isDestroyed.get()) + { + throw new IllegalStateException("The managed connection is already destroyed"); + } + + return new QpidRAMetaData(this); + } + + /** + * Set the log writer -- NOT SUPPORTED + * @param out The log writer + * @exception ResourceException If operation fails + */ + public void setLogWriter(final PrintWriter out) throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("setLogWriter(" + out + ")"); + } + } + + /** + * Get the log writer -- NOT SUPPORTED + * @return Always null + * @exception ResourceException If operation fails + */ + public PrintWriter getLogWriter() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("getLogWriter()"); + } + + return null; + } + + /** + * Notifies user of a JMS exception. + * @param exception The JMS exception + */ + public void onException(final JMSException exception) + { + if (_log.isTraceEnabled()) + { + _log.trace("onException(" + exception + ")"); + } + + if (_isDestroyed.get()) + { + if (_log.isTraceEnabled()) + { + _log.trace("Ignoring error on already destroyed connection " + this, exception); + } + return; + } + + _log.warn("Handling JMS exception failure: " + this, exception); + + try + { + _connection.setExceptionListener(null); + } + catch (JMSException e) + { + _log.debug("Unable to unset exception listener", e); + } + + ConnectionEvent event = new ConnectionEvent(this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception); + sendEvent(event); + } + + /** + * Get the session for this connection. + * @return The session + * @throws JMSException + */ + protected Session getSession() throws JMSException + { + if(_xaSession != null && !_mcf.getUseLocalTx()) + { + if (_log.isTraceEnabled()) + { + _log.trace("getSession() -> XA session " + Util.asString(_xaSession)); + } + + return _xaSession; + } + else + { + if (_log.isTraceEnabled()) + { + _log.trace("getSession() -> session " + Util.asString(_session)); + } + + return _session; + } + } + + /** + * Send an event. + * @param event The event to send. + */ + protected void sendEvent(final ConnectionEvent event) + { + if (_log.isTraceEnabled()) + { + _log.trace("sendEvent(" + event + ")"); + } + + int type = event.getId(); + + // convert to an array to avoid concurrent modification exceptions + ConnectionEventListener[] list = _eventListeners.toArray(new ConnectionEventListener[_eventListeners.size()]); + + for (ConnectionEventListener l : list) + { + switch (type) + { + case ConnectionEvent.CONNECTION_CLOSED: + l.connectionClosed(event); + break; + + case ConnectionEvent.LOCAL_TRANSACTION_STARTED: + l.localTransactionStarted(event); + break; + + case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED: + l.localTransactionCommitted(event); + break; + + case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK: + l.localTransactionRolledback(event); + break; + + case ConnectionEvent.CONNECTION_ERROR_OCCURRED: + l.connectionErrorOccurred(event); + break; + + default: + throw new IllegalArgumentException("Illegal eventType: " + type); + } + } + } + + /** + * Remove a handle from the handle map. + * @param handle The handle to remove. + */ + protected void removeHandle(final QpidRASessionImpl handle) + { + if (_log.isTraceEnabled()) + { + _log.trace("removeHandle(" + handle + ")"); + } + + _handles.remove(handle); + } + + /** + * Get the request info for this connection. + * @return The connection request info for this connection. + */ + protected QpidRAConnectionRequestInfo getCRI() + { + if (_log.isTraceEnabled()) + { + _log.trace("getCRI()"); + } + + return _cri; + } + + /** + * Get the connection factory for this connection. + * @return The connection factory for this connection. + */ + protected QpidRAManagedConnectionFactory getManagedConnectionFactory() + { + if (_log.isTraceEnabled()) + { + _log.trace("getManagedConnectionFactory()"); + } + + return _mcf; + } + + /** + * Start the connection + * @exception JMSException Thrown if the connection cant be started + */ + void start() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("start()"); + } + + if (_connection != null) + { + _connection.start(); + } + } + + /** + * Stop the connection + * @exception JMSException Thrown if the connection cant be stopped + */ + void stop() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("stop()"); + } + + if (_connection != null) + { + _connection.stop(); + } + } + + /** + * Get the user name + * @return The user name + */ + protected String getUserName() + { + if (_log.isTraceEnabled()) + { + _log.trace("getUserName()"); + } + + return _userName; + } + + /** + * Setup the connection. + * @exception ResourceException Thrown if a connection couldnt be created + */ + private void setup() throws ResourceException + { + if (_log.isTraceEnabled()) + { + _log.trace("setup()"); + } + + try + { + boolean transacted = _cri.isTransacted(); + int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; + boolean localTx = _mcf.getUseLocalTx(); + + if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION) + { + if (_userName != null && _password != null) + { + if(!localTx) + { + _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password); + } + else + { + _connection = _mcf.getCleanAMQConnectionFactory().createTopicConnection(); + } + } + else + { + if(!localTx) + { + _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection(); + } + else + { + _connection = _mcf.getDefaultAMQConnectionFactory().createTopicConnection(); + } + } + + if(!localTx) + { + _xaSession = ((XATopicConnection)_connection).createXATopicSession(); + + } + else + { + _session = ((TopicConnection)_connection).createTopicSession(localTx, acknowledgeMode); + } + } + else if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION) + { + if (_userName != null && _password != null) + { + if(!localTx) + { + _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password); + } + else + { + _connection = _mcf.getCleanAMQConnectionFactory().createQueueConnection(); + } + } + else + { + if(!localTx) + { + _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection(); + } + else + { + _connection = _mcf.getDefaultAMQConnectionFactory().createQueueConnection(); + } + } + + if(!localTx) + { + _xaSession = ((XAQueueConnection)_connection).createXAQueueSession(); + + } + else + { + _session = ((QueueConnection)_connection).createQueueSession(localTx, acknowledgeMode); + + } + } + else + { + if (_userName != null && _password != null) + { + if(!localTx) + { + _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password); + } + else + { + _connection = _mcf.getCleanAMQConnectionFactory().createConnection(); + } + } + else + { + if(!localTx) + { + _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection(); + } + else + { + _connection = _mcf.getDefaultAMQConnectionFactory().createConnection(); + } + } + + if(!localTx) + { + _xaSession = ((XAQueueConnection)_connection).createXASession(); + + } + else + { + _session = ((QueueConnection)_connection).createSession(localTx, acknowledgeMode); + + } + } + + _connection.setExceptionListener(this); + } + catch (JMSException je) + { + throw new ResourceException(je.getMessage(), je); + } + } + + protected void setInManagedTx(boolean inManagedTx) + { + this._inManagedTx = inManagedTx; + } + +} |