diff options
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java')
-rw-r--r-- | qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java | 1732 |
1 files changed, 1732 insertions, 0 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java new file mode 100644 index 0000000000..a270253c13 --- /dev/null +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java @@ -0,0 +1,1732 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.ra; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionInProgressException; +import javax.jms.XAQueueSession; +import javax.jms.XASession; +import javax.jms.XATopicSession; +import javax.resource.ResourceException; +import javax.resource.spi.ConnectionEvent; +import javax.resource.spi.ManagedConnection; +import javax.transaction.xa.XAResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A joint interface for JMS sessions + * + */ +public class QpidRASessionImpl implements Session, QueueSession, TopicSession, XASession, XAQueueSession, XATopicSession, QpidRASession +{ + /** The logger */ + private static final Logger _log = LoggerFactory.getLogger(QpidRASessionImpl.class); + + /** The managed connection */ + private volatile QpidRAManagedConnection _mc; + /** The locked managed connection */ + private QpidRAManagedConnection _lockedMC; + + /** The connection request info */ + private final QpidRAConnectionRequestInfo _cri; + + /** The session factory */ + private QpidRASessionFactory _sf; + + /** The message consumers */ + private final Set<MessageConsumer> _consumers; + + /** The message producers */ + private final Set<MessageProducer> _producers; + + /** The queue browsers */ + private final Set<QueueBrowser> _browsers; + + /** Are we started */ + private AtomicBoolean _started = new AtomicBoolean(false) ; + + /** + * Constructor + * @param mc The managed connection + * @param cri The connection request info + */ + public QpidRASessionImpl(final QpidRAManagedConnection mc, final QpidRAConnectionRequestInfo cri) + { + if (_log.isTraceEnabled()) + { + _log.trace("constructor(" + mc + ", " + cri + ")"); + } + + this._mc = mc; + this._cri = cri; + _sf = null; + _consumers = new HashSet<MessageConsumer>(); + _producers = new HashSet<MessageProducer>(); + _browsers = new HashSet<QueueBrowser>(); + } + + /** + * Set the session factory + * @param sf The session factory + */ + public void setQpidSessionFactory(final QpidRASessionFactory sf) + { + if (_log.isTraceEnabled()) + { + _log.trace("setQpidSessionFactory(" + sf + ")"); + } + + _started.set(false) ; + this._sf = sf; + } + + /** + * Lock + * @exception JMSException Thrown if an error occurs + * @exception IllegalStateException The session is closed + */ + protected void lock() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("lock()"); + } + + final QpidRAManagedConnection mc = this._mc; + if (mc != null) + { + mc.tryLock(); + _lockedMC = mc ; + } + else + { + throw new IllegalStateException("Connection is not associated with a managed connection. " + this); + } + } + + /** + * Unlock + */ + protected void unlock() + { + if (_log.isTraceEnabled()) + { + _log.trace("unlock()"); + } + + if (_lockedMC != null) + { + try + { + _lockedMC.unlock(); + } + finally + { + _lockedMC = null ; + } + } + + // We recreate the lock when returned to the pool + // so missing the unlock after disassociation is not important + } + + /** + * Create a bytes message + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public BytesMessage createBytesMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createBytesMessage" + Util.asString(session)); + } + + return session.createBytesMessage(); + } + + /** + * Create a map message + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public MapMessage createMapMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createMapMessage" + Util.asString(session)); + } + + return session.createMapMessage(); + } + + /** + * Create a message + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public Message createMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createMessage" + Util.asString(session)); + } + + return session.createMessage(); + } + + /** + * Create an object message + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public ObjectMessage createObjectMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createObjectMessage" + Util.asString(session)); + } + + return session.createObjectMessage(); + } + + /** + * Create an object message + * @param object The object + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public ObjectMessage createObjectMessage(final Serializable object) throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createObjectMessage(" + object + ")" + Util.asString(session)); + } + + return session.createObjectMessage(object); + } + + /** + * Create a stream message + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public StreamMessage createStreamMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createStreamMessage" + Util.asString(session)); + } + + return session.createStreamMessage(); + } + + /** + * Create a text message + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public TextMessage createTextMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createTextMessage" + Util.asString(session)); + } + + return session.createTextMessage(); + } + + /** + * Create a text message + * @param string The text + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public TextMessage createTextMessage(final String string) throws JMSException + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createTextMessage(" + string + ")" + Util.asString(session)); + } + + return session.createTextMessage(string); + } + + /** + * Get transacted + * @return True if transacted; otherwise false + * @exception JMSException Thrown if an error occurs + */ + public boolean getTransacted() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("getTransacted()"); + } + + getSessionInternal(); + return _cri.isTransacted(); + } + + /** + * Get the message listener -- throws IllegalStateException + * @return The message listener + * @exception JMSException Thrown if an error occurs + */ + public MessageListener getMessageListener() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("getMessageListener()"); + } + + throw new IllegalStateException("Method not allowed"); + } + + /** + * Set the message listener -- Throws IllegalStateException + * @param listener The message listener + * @exception JMSException Thrown if an error occurs + */ + public void setMessageListener(final MessageListener listener) throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("setMessageListener(" + listener + ")"); + } + + throw new IllegalStateException("Method not allowed"); + } + + /** + * Always throws an Error. + * @exception Error Method not allowed. + */ + public void run() + { + if (_log.isTraceEnabled()) + { + _log.trace("run()"); + } + + throw new Error("Method not allowed"); + } + + /** + * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the + * managed connection. + * @exception JMSException Failed to close session. + */ + public void close() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("close()"); + } + + _sf.closeSession(this); + closeSession(); + } + + /** + * Commit + * @exception JMSException Failed to close session. + */ + public void commit() throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION || + _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new TransactionInProgressException("XA connection"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (_cri.isTransacted() == false) + { + throw new IllegalStateException("Session is not transacted"); + } + + if (_log.isTraceEnabled()) + { + _log.trace("Commit session " + this); + } + + session.commit(); + } + finally + { + unlock(); + } + } + + /** + * Rollback + * @exception JMSException Failed to close session. + */ + public void rollback() throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION || + _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new TransactionInProgressException("XA connection"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (_cri.isTransacted() == false) + { + throw new IllegalStateException("Session is not transacted"); + } + + if (_log.isTraceEnabled()) + { + _log.trace("Rollback session " + this); + } + + session.rollback(); + } + finally + { + unlock(); + } + } + + /** + * Recover + * @exception JMSException Failed to close session. + */ + public void recover() throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (_cri.isTransacted()) + { + throw new IllegalStateException("Session is transacted"); + } + + if (_log.isTraceEnabled()) + { + _log.trace("Recover session " + this); + } + + session.recover(); + } + finally + { + unlock(); + } + } + + /** + * Create a topic + * @param topicName The topic name + * @return The topic + * @exception JMSException Thrown if an error occurs + */ + public Topic createTopic(final String topicName) throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot create topic for javax.jms.QueueSession"); + } + + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createTopic " + Util.asString(session) + " topicName=" + topicName); + } + + Topic result = session.createTopic(topicName); + + if (_log.isTraceEnabled()) + { + _log.trace("createdTopic " + Util.asString(session) + " topic=" + result); + } + + return result; + } + + /** + * Create a topic subscriber + * @param topic The topic + * @return The subscriber + * @exception JMSException Thrown if an error occurs + */ + public TopicSubscriber createSubscriber(final Topic topic) throws JMSException + { + lock(); + try + { + TopicSession session = getTopicSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createSubscriber " + Util.asString(session) + " topic=" + topic); + } + + TopicSubscriber result = session.createSubscriber(topic); + result = new QpidRATopicSubscriber(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a topic subscriber + * @param topic The topic + * @param messageSelector The message selector + * @param noLocal If true inhibits the delivery of messages published by its own connection + * @return The subscriber + * @exception JMSException Thrown if an error occurs + */ + public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException + { + lock(); + try + { + TopicSession session = getTopicSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createSubscriber " + Util.asString(session) + + " topic=" + + topic + + " selector=" + + messageSelector + + " noLocal=" + + noLocal); + } + + TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal); + result = new QpidRATopicSubscriber(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a durable topic subscriber + * @param topic The topic + * @param name The name + * @return The subscriber + * @exception JMSException Thrown if an error occurs + */ + public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createDurableSubscriber " + Util.asString(session) + " topic=" + topic + " name=" + name); + } + + TopicSubscriber result = session.createDurableSubscriber(topic, name); + result = new QpidRATopicSubscriber(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a topic subscriber + * @param topic The topic + * @param name The name + * @param messageSelector The message selector + * @param noLocal If true inhibits the delivery of messages published by its own connection + * @return The subscriber + * @exception JMSException Thrown if an error occurs + */ + public TopicSubscriber createDurableSubscriber(final Topic topic, + final String name, + final String messageSelector, + final boolean noLocal) throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createDurableSubscriber " + Util.asString(session) + + " topic=" + + topic + + " name=" + + name + + " selector=" + + messageSelector + + " noLocal=" + + noLocal); + } + + TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal); + result = new QpidRATopicSubscriber(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a topic publisher + * @param topic The topic + * @return The publisher + * @exception JMSException Thrown if an error occurs + */ + public TopicPublisher createPublisher(final Topic topic) throws JMSException + { + lock(); + try + { + TopicSession session = getTopicSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createPublisher " + Util.asString(session) + " topic=" + topic); + } + + TopicPublisher result = session.createPublisher(topic); + result = new QpidRATopicPublisher(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdPublisher " + Util.asString(session) + " publisher=" + result); + } + + addProducer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a temporary topic + * @return The temporary topic + * @exception JMSException Thrown if an error occurs + */ + public TemporaryTopic createTemporaryTopic() throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createTemporaryTopic " + Util.asString(session)); + } + + TemporaryTopic temp = session.createTemporaryTopic(); + + if (_log.isTraceEnabled()) + { + _log.trace("createdTemporaryTopic " + Util.asString(session) + " temp=" + temp); + } + + _sf.addTemporaryTopic(temp); + + return temp; + } + finally + { + unlock(); + } + } + + /** + * Unsubscribe + * @param name The name + * @exception JMSException Thrown if an error occurs + */ + public void unsubscribe(final String name) throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("unsubscribe " + Util.asString(session) + " name=" + name); + } + + session.unsubscribe(name); + } + finally + { + unlock(); + } + } + + /** + * Create a browser + * @param queue The queue + * @return The browser + * @exception JMSException Thrown if an error occurs + */ + public QueueBrowser createBrowser(final Queue queue) throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession"); + } + + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue); + } + + QueueBrowser result = session.createBrowser(queue); + + if (_log.isTraceEnabled()) + { + _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result); + } + + return result; + } + + /** + * Create a browser + * @param queue The queue + * @param messageSelector The message selector + * @return The browser + * @exception JMSException Thrown if an error occurs + */ + public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession"); + } + + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector); + } + + QueueBrowser result = session.createBrowser(queue, messageSelector); + result = new QpidRAQueueBrowser(result, this); + addQueueBrowser(result) ; + + if (_log.isTraceEnabled()) + { + _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result); + } + + return result; + } + + /** + * Create a queue + * @param queueName The queue name + * @return The queue + * @exception JMSException Thrown if an error occurs + */ + public Queue createQueue(final String queueName) throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create browser or javax.jms.TopicSession"); + } + + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createQueue " + Util.asString(session) + " queueName=" + queueName); + } + + Queue result = session.createQueue(queueName); + + if (_log.isTraceEnabled()) + { + _log.trace("createdQueue " + Util.asString(session) + " queue=" + result); + } + + return result; + } + + /** + * Create a queue receiver + * @param queue The queue + * @return The queue receiver + * @exception JMSException Thrown if an error occurs + */ + public QueueReceiver createReceiver(final Queue queue) throws JMSException + { + lock(); + try + { + QueueSession session = getQueueSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue); + } + + QueueReceiver result = session.createReceiver(queue); + result = new QpidRAQueueReceiver(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a queue receiver + * @param queue The queue + * @param messageSelector + * @return The queue receiver + * @exception JMSException Thrown if an error occurs + */ + public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException + { + lock(); + try + { + QueueSession session = getQueueSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector); + } + + QueueReceiver result = session.createReceiver(queue, messageSelector); + result = new QpidRAQueueReceiver(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a queue sender + * @param queue The queue + * @return The queue sender + * @exception JMSException Thrown if an error occurs + */ + public QueueSender createSender(final Queue queue) throws JMSException + { + lock(); + try + { + QueueSession session = getQueueSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createSender " + Util.asString(session) + " queue=" + queue); + } + + QueueSender result = session.createSender(queue); + result = new QpidRAQueueSender(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdSender " + Util.asString(session) + " sender=" + result); + } + + addProducer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a temporary queue + * @return The temporary queue + * @exception JMSException Thrown if an error occurs + */ + public TemporaryQueue createTemporaryQueue() throws JMSException + { + if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createTemporaryQueue " + Util.asString(session)); + } + + TemporaryQueue temp = session.createTemporaryQueue(); + + if (_log.isTraceEnabled()) + { + _log.trace("createdTemporaryQueue " + Util.asString(session) + " temp=" + temp); + } + + _sf.addTemporaryQueue(temp); + + return temp; + } + finally + { + unlock(); + } + } + + /** + * Create a message consumer + * @param destination The destination + * @return The message consumer + * @exception JMSException Thrown if an error occurs + */ + public MessageConsumer createConsumer(final Destination destination) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createConsumer " + Util.asString(session) + " dest=" + destination); + } + + MessageConsumer result = session.createConsumer(destination); + result = new QpidRAMessageConsumer(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a message consumer + * @param destination The destination + * @param messageSelector The message selector + * @return The message consumer + * @exception JMSException Thrown if an error occurs + */ + public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createConsumer " + Util.asString(session) + + " dest=" + + destination + + " messageSelector=" + + messageSelector); + } + + MessageConsumer result = session.createConsumer(destination, messageSelector); + result = new QpidRAMessageConsumer(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a message consumer + * @param destination The destination + * @param messageSelector The message selector + * @param noLocal If true inhibits the delivery of messages published by its own connection + * @return The message consumer + * @exception JMSException Thrown if an error occurs + */ + public MessageConsumer createConsumer(final Destination destination, + final String messageSelector, + final boolean noLocal) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createConsumer " + Util.asString(session) + + " dest=" + + destination + + " messageSelector=" + + messageSelector + + " noLocal=" + + noLocal); + } + + MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal); + result = new QpidRAMessageConsumer(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a message producer + * @param destination The destination + * @return The message producer + * @exception JMSException Thrown if an error occurs + */ + public MessageProducer createProducer(final Destination destination) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (_log.isTraceEnabled()) + { + _log.trace("createProducer " + Util.asString(session) + " dest=" + destination); + } + + MessageProducer result = session.createProducer(destination); + result = new QpidRAMessageProducer(result, this); + + if (_log.isTraceEnabled()) + { + _log.trace("createdProducer " + Util.asString(session) + " producer=" + result); + } + + addProducer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Get the acknowledge mode + * @return The mode + * @exception JMSException Thrown if an error occurs + */ + public int getAcknowledgeMode() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("getAcknowledgeMode()"); + } + + getSessionInternal(); + return _cri.getAcknowledgeMode(); + } + + /** + * Get the XA resource + * @return The XA resource + * @exception IllegalStateException If non XA connection + */ + public XAResource getXAResource() + { + if (_log.isTraceEnabled()) + { + _log.trace("getXAResource()"); + } + + if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || + _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION) + { + return null; + } + + try + { + lock(); + + return getXAResourceInternal(); + } + catch (Throwable t) + { + return null; + } + finally + { + unlock(); + } + } + + /** + * Get the session + * @return The session + * @exception JMSException Thrown if an error occurs + */ + public Session getSession() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("getSession()"); + } + + if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || + _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION) + { + throw new IllegalStateException("Non XA connection"); + } + + lock(); + try + { + return this; + } + finally + { + unlock(); + } + } + + /** + * Get the queue session + * @return The queue session + * @exception JMSException Thrown if an error occurs + */ + public QueueSession getQueueSession() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("getQueueSession()"); + } + + if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || + _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION) + { + throw new IllegalStateException("Non XA connection"); + } + + lock(); + try + { + return this; + } + finally + { + unlock(); + } + } + + /** + * Get the topic session + * @return The topic session + * @exception JMSException Thrown if an error occurs + */ + public TopicSession getTopicSession() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("getTopicSession()"); + } + + if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || + _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION) + { + throw new IllegalStateException("Non XA connection"); + } + + lock(); + try + { + return this; + } + finally + { + unlock(); + } + } + + /** + * Set the managed connection + * @param managedConnection The managed connection + */ + void setManagedConnection(final QpidRAManagedConnection managedConnection) + { + if (_log.isTraceEnabled()) + { + _log.trace("setManagedConnection(" + managedConnection + ")"); + } + + final QpidRAManagedConnection mc = this._mc; + if (mc != null) + { + mc.removeHandle(this); + } + + this._mc = managedConnection; + } + + /** for tests only */ + public ManagedConnection getManagedConnection() + { + return _mc; + } + + /** + * Destroy + */ + void destroy() + { + if (_log.isTraceEnabled()) + { + _log.trace("destroy()"); + } + + _mc = null; + } + + /** + * Start + * @exception JMSException Thrown if an error occurs + */ + public void start() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("start()"); + } + + final QpidRAManagedConnection mc = this._mc; + if (mc != null) + { + _started.set(true) ; + mc.start(); + } + } + + /** + * Stop + * @exception JMSException Thrown if an error occurs + */ + void stop() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("stop()"); + } + + final QpidRAManagedConnection mc = this._mc; + if (mc != null) + { + mc.stop(); + _started.set(false) ; + } + } + + /** + * Check strict + * @exception JMSException Thrown if an error occurs + */ + void checkStrict() throws JMSException + { + if (_log.isTraceEnabled()) + { + _log.trace("checkStrict()"); + } + + if (_mc != null) + { + throw new IllegalStateException(QpidRASessionFactory.ISE); + } + } + + /** + * Close session + * @exception JMSException Thrown if an error occurs + */ + void closeSession() throws JMSException + { + final QpidRAManagedConnection mc = this._mc; + if (mc != null) + { + _log.trace("Closing session"); + + try + { + mc.stop(); + } + catch (Throwable t) + { + _log.trace("Error stopping managed connection", t); + } + + synchronized (_consumers) + { + for (Iterator<MessageConsumer> i = _consumers.iterator(); i.hasNext();) + { + QpidRAMessageConsumer consumer = (QpidRAMessageConsumer)i.next(); + try + { + consumer.closeConsumer(); + } + catch (Throwable t) + { + _log.trace("Error closing consumer", t); + } + i.remove(); + } + } + + synchronized (_producers) + { + for (Iterator<MessageProducer> i = _producers.iterator(); i.hasNext();) + { + QpidRAMessageProducer producer = (QpidRAMessageProducer)i.next(); + try + { + producer.closeProducer(); + } + catch (Throwable t) + { + _log.trace("Error closing producer", t); + } + i.remove(); + } + } + + synchronized (_browsers) + { + for (Iterator<QueueBrowser> i = _browsers.iterator(); i.hasNext();) + { + QpidRAQueueBrowser browser = (QpidRAQueueBrowser)i.next(); + try + { + browser.close(); + } + catch (Throwable t) + { + _log.trace("Error closing browser", t); + } + i.remove(); + } + } + + mc.removeHandle(this); + ConnectionEvent ev = new ConnectionEvent(mc, ConnectionEvent.CONNECTION_CLOSED); + ev.setConnectionHandle(this); + mc.sendEvent(ev); + this._mc = null; + } + } + + /** + * Add consumer + * @param consumer The consumer + */ + void addConsumer(final MessageConsumer consumer) + { + if (_log.isTraceEnabled()) + { + _log.trace("addConsumer(" + consumer + ")"); + } + + synchronized (_consumers) + { + _consumers.add(consumer); + } + } + + /** + * Remove consumer + * @param consumer The consumer + */ + void removeConsumer(final MessageConsumer consumer) + { + if (_log.isTraceEnabled()) + { + _log.trace("removeConsumer(" + consumer + ")"); + } + + synchronized (_consumers) + { + _consumers.remove(consumer); + } + } + + /** + * Add producer + * @param producer The producer + */ + void addProducer(final MessageProducer producer) + { + if (_log.isTraceEnabled()) + { + _log.trace("addProducer(" + producer + ")"); + } + + synchronized (_producers) + { + _producers.add(producer); + } + } + + /** + * Remove producer + * @param producer The producer + */ + void removeProducer(final MessageProducer producer) + { + if (_log.isTraceEnabled()) + { + _log.trace("removeProducer(" + producer + ")"); + } + + synchronized (_producers) + { + _producers.remove(producer); + } + } + + /** + * Add queue browser + * @param browser The queue browser + */ + void addQueueBrowser(final QueueBrowser browser) + { + if (_log.isTraceEnabled()) + { + _log.trace("addQueueBrowser(" + browser + ")"); + } + + synchronized (_browsers) + { + _browsers.add(browser); + } + } + + /** + * Remove queue browser + * @param browser The queue browser + */ + void removeQueueBrowser(final QueueBrowser browser) + { + if (_log.isTraceEnabled()) + { + _log.trace("removeQueueBrowser(" + browser + ")"); + } + + synchronized (_browsers) + { + _browsers.remove(browser); + } + } + + /** + * Get the session and ensure that it is open + * @return The session + * @exception JMSException Thrown if an error occurs + * @exception IllegalStateException The session is closed + */ + Session getSessionInternal() throws JMSException + { + final QpidRAManagedConnection mc = this._mc; + if (mc == null) + { + throw new IllegalStateException("The session is closed"); + } + + Session session = mc.getSession(); + + if (_log.isTraceEnabled()) + { + _log.trace("getSessionInternal " + Util.asString(session) + " for " + this); + } + + return session; + } + + /** + * Get the XA resource and ensure that it is open + * @return The XA Resource + * @exception JMSException Thrown if an error occurs + * @exception IllegalStateException The session is closed + */ + XAResource getXAResourceInternal() throws JMSException + { + final QpidRAManagedConnection mc = this._mc; + if (mc == null) + { + throw new IllegalStateException("The session is closed"); + } + + try + { + XAResource xares = mc.getXAResource(); + + if (_log.isTraceEnabled()) + { + _log.trace("getXAResourceInternal " + xares + " for " + this); + } + + return xares; + } + catch (ResourceException e) + { + JMSException jmse = new JMSException("Unable to get XA Resource"); + jmse.initCause(e); + throw jmse; + } + } + + /** + * Get the queue session + * @return The queue session + * @exception JMSException Thrown if an error occurs + * @exception IllegalStateException The session is closed + */ + QueueSession getQueueSessionInternal() throws JMSException + { + Session s = getSessionInternal(); + if (!(s instanceof QueueSession)) + { + throw new InvalidDestinationException("Attempting to use QueueSession methods on: " + this); + } + return (QueueSession)s; + } + + /** + * Get the topic session + * @return The topic session + * @exception JMSException Thrown if an error occurs + * @exception IllegalStateException The session is closed + */ + TopicSession getTopicSessionInternal() throws JMSException + { + Session s = getSessionInternal(); + if (!(s instanceof TopicSession)) + { + throw new InvalidDestinationException("Attempting to use TopicSession methods on: " + this); + } + return (TopicSession)s; + } + + /** + * @throws SystemException + * @throws RollbackException + * + */ + public void checkState() throws JMSException + { + final QpidRAManagedConnection mc = this._mc; + if (mc != null) + { + mc.checkTransactionActive(); + } + } + + /** + * Has this session been started? + * @return true if started, false if stopped. + */ + public boolean isStarted() + { + return _started.get(); + } +} |