summaryrefslogtreecommitdiff
path: root/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java')
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java1732
1 files changed, 1732 insertions, 0 deletions
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
new file mode 100644
index 0000000000..a270253c13
--- /dev/null
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
@@ -0,0 +1,1732 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionEvent;
+import javax.resource.spi.ManagedConnection;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A joint interface for JMS sessions
+ *
+ */
+public class QpidRASessionImpl implements Session, QueueSession, TopicSession, XASession, XAQueueSession, XATopicSession, QpidRASession
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRASessionImpl.class);
+
+ /** The managed connection */
+ private volatile QpidRAManagedConnection _mc;
+ /** The locked managed connection */
+ private QpidRAManagedConnection _lockedMC;
+
+ /** The connection request info */
+ private final QpidRAConnectionRequestInfo _cri;
+
+ /** The session factory */
+ private QpidRASessionFactory _sf;
+
+ /** The message consumers */
+ private final Set<MessageConsumer> _consumers;
+
+ /** The message producers */
+ private final Set<MessageProducer> _producers;
+
+ /** The queue browsers */
+ private final Set<QueueBrowser> _browsers;
+
+ /** Are we started */
+ private AtomicBoolean _started = new AtomicBoolean(false) ;
+
+ /**
+ * Constructor
+ * @param mc The managed connection
+ * @param cri The connection request info
+ */
+ public QpidRASessionImpl(final QpidRAManagedConnection mc, final QpidRAConnectionRequestInfo cri)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + mc + ", " + cri + ")");
+ }
+
+ this._mc = mc;
+ this._cri = cri;
+ _sf = null;
+ _consumers = new HashSet<MessageConsumer>();
+ _producers = new HashSet<MessageProducer>();
+ _browsers = new HashSet<QueueBrowser>();
+ }
+
+ /**
+ * Set the session factory
+ * @param sf The session factory
+ */
+ public void setQpidSessionFactory(final QpidRASessionFactory sf)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setQpidSessionFactory(" + sf + ")");
+ }
+
+ _started.set(false) ;
+ this._sf = sf;
+ }
+
+ /**
+ * Lock
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ protected void lock() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("lock()");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.tryLock();
+ _lockedMC = mc ;
+ }
+ else
+ {
+ throw new IllegalStateException("Connection is not associated with a managed connection. " + this);
+ }
+ }
+
+ /**
+ * Unlock
+ */
+ protected void unlock()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("unlock()");
+ }
+
+ if (_lockedMC != null)
+ {
+ try
+ {
+ _lockedMC.unlock();
+ }
+ finally
+ {
+ _lockedMC = null ;
+ }
+ }
+
+ // We recreate the lock when returned to the pool
+ // so missing the unlock after disassociation is not important
+ }
+
+ /**
+ * Create a bytes message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public BytesMessage createBytesMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createBytesMessage" + Util.asString(session));
+ }
+
+ return session.createBytesMessage();
+ }
+
+ /**
+ * Create a map message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MapMessage createMapMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createMapMessage" + Util.asString(session));
+ }
+
+ return session.createMapMessage();
+ }
+
+ /**
+ * Create a message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Message createMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createMessage" + Util.asString(session));
+ }
+
+ return session.createMessage();
+ }
+
+ /**
+ * Create an object message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ObjectMessage createObjectMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createObjectMessage" + Util.asString(session));
+ }
+
+ return session.createObjectMessage();
+ }
+
+ /**
+ * Create an object message
+ * @param object The object
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ObjectMessage createObjectMessage(final Serializable object) throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createObjectMessage(" + object + ")" + Util.asString(session));
+ }
+
+ return session.createObjectMessage(object);
+ }
+
+ /**
+ * Create a stream message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public StreamMessage createStreamMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createStreamMessage" + Util.asString(session));
+ }
+
+ return session.createStreamMessage();
+ }
+
+ /**
+ * Create a text message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TextMessage createTextMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTextMessage" + Util.asString(session));
+ }
+
+ return session.createTextMessage();
+ }
+
+ /**
+ * Create a text message
+ * @param string The text
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TextMessage createTextMessage(final String string) throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTextMessage(" + string + ")" + Util.asString(session));
+ }
+
+ return session.createTextMessage(string);
+ }
+
+ /**
+ * Get transacted
+ * @return True if transacted; otherwise false
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getTransacted() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransacted()");
+ }
+
+ getSessionInternal();
+ return _cri.isTransacted();
+ }
+
+ /**
+ * Get the message listener -- throws IllegalStateException
+ * @return The message listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageListener getMessageListener() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageListener()");
+ }
+
+ throw new IllegalStateException("Method not allowed");
+ }
+
+ /**
+ * Set the message listener -- Throws IllegalStateException
+ * @param listener The message listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setMessageListener(final MessageListener listener) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setMessageListener(" + listener + ")");
+ }
+
+ throw new IllegalStateException("Method not allowed");
+ }
+
+ /**
+ * Always throws an Error.
+ * @exception Error Method not allowed.
+ */
+ public void run()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("run()");
+ }
+
+ throw new Error("Method not allowed");
+ }
+
+ /**
+ * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
+ * managed connection.
+ * @exception JMSException Failed to close session.
+ */
+ public void close() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("close()");
+ }
+
+ _sf.closeSession(this);
+ closeSession();
+ }
+
+ /**
+ * Commit
+ * @exception JMSException Failed to close session.
+ */
+ public void commit() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new TransactionInProgressException("XA connection");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_cri.isTransacted() == false)
+ {
+ throw new IllegalStateException("Session is not transacted");
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Commit session " + this);
+ }
+
+ session.commit();
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Rollback
+ * @exception JMSException Failed to close session.
+ */
+ public void rollback() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new TransactionInProgressException("XA connection");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_cri.isTransacted() == false)
+ {
+ throw new IllegalStateException("Session is not transacted");
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Rollback session " + this);
+ }
+
+ session.rollback();
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Recover
+ * @exception JMSException Failed to close session.
+ */
+ public void recover() throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_cri.isTransacted())
+ {
+ throw new IllegalStateException("Session is transacted");
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Recover session " + this);
+ }
+
+ session.recover();
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic
+ * @param topicName The topic name
+ * @return The topic
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Topic createTopic(final String topicName) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create topic for javax.jms.QueueSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTopic " + Util.asString(session) + " topicName=" + topicName);
+ }
+
+ Topic result = session.createTopic(topicName);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdTopic " + Util.asString(session) + " topic=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a topic subscriber
+ * @param topic The topic
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+ {
+ lock();
+ try
+ {
+ TopicSession session = getTopicSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createSubscriber " + Util.asString(session) + " topic=" + topic);
+ }
+
+ TopicSubscriber result = session.createSubscriber(topic);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic subscriber
+ * @param topic The topic
+ * @param messageSelector The message selector
+ * @param noLocal If true inhibits the delivery of messages published by its own connection
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException
+ {
+ lock();
+ try
+ {
+ TopicSession session = getTopicSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createSubscriber " + Util.asString(session) +
+ " topic=" +
+ topic +
+ " selector=" +
+ messageSelector +
+ " noLocal=" +
+ noLocal);
+ }
+
+ TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a durable topic subscriber
+ * @param topic The topic
+ * @param name The name
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createDurableSubscriber " + Util.asString(session) + " topic=" + topic + " name=" + name);
+ }
+
+ TopicSubscriber result = session.createDurableSubscriber(topic, name);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic subscriber
+ * @param topic The topic
+ * @param name The name
+ * @param messageSelector The message selector
+ * @param noLocal If true inhibits the delivery of messages published by its own connection
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createDurableSubscriber(final Topic topic,
+ final String name,
+ final String messageSelector,
+ final boolean noLocal) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createDurableSubscriber " + Util.asString(session) +
+ " topic=" +
+ topic +
+ " name=" +
+ name +
+ " selector=" +
+ messageSelector +
+ " noLocal=" +
+ noLocal);
+ }
+
+ TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic publisher
+ * @param topic The topic
+ * @return The publisher
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicPublisher createPublisher(final Topic topic) throws JMSException
+ {
+ lock();
+ try
+ {
+ TopicSession session = getTopicSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createPublisher " + Util.asString(session) + " topic=" + topic);
+ }
+
+ TopicPublisher result = session.createPublisher(topic);
+ result = new QpidRATopicPublisher(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdPublisher " + Util.asString(session) + " publisher=" + result);
+ }
+
+ addProducer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a temporary topic
+ * @return The temporary topic
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTemporaryTopic " + Util.asString(session));
+ }
+
+ TemporaryTopic temp = session.createTemporaryTopic();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdTemporaryTopic " + Util.asString(session) + " temp=" + temp);
+ }
+
+ _sf.addTemporaryTopic(temp);
+
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Unsubscribe
+ * @param name The name
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void unsubscribe(final String name) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("unsubscribe " + Util.asString(session) + " name=" + name);
+ }
+
+ session.unsubscribe(name);
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a browser
+ * @param queue The queue
+ * @return The browser
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueBrowser createBrowser(final Queue queue) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue);
+ }
+
+ QueueBrowser result = session.createBrowser(queue);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a browser
+ * @param queue The queue
+ * @param messageSelector The message selector
+ * @return The browser
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
+ }
+
+ QueueBrowser result = session.createBrowser(queue, messageSelector);
+ result = new QpidRAQueueBrowser(result, this);
+ addQueueBrowser(result) ;
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a queue
+ * @param queueName The queue name
+ * @return The queue
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Queue createQueue(final String queueName) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create browser or javax.jms.TopicSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createQueue " + Util.asString(session) + " queueName=" + queueName);
+ }
+
+ Queue result = session.createQueue(queueName);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdQueue " + Util.asString(session) + " queue=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a queue receiver
+ * @param queue The queue
+ * @return The queue receiver
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueReceiver createReceiver(final Queue queue) throws JMSException
+ {
+ lock();
+ try
+ {
+ QueueSession session = getQueueSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue);
+ }
+
+ QueueReceiver result = session.createReceiver(queue);
+ result = new QpidRAQueueReceiver(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a queue receiver
+ * @param queue The queue
+ * @param messageSelector
+ * @return The queue receiver
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
+ {
+ lock();
+ try
+ {
+ QueueSession session = getQueueSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
+ }
+
+ QueueReceiver result = session.createReceiver(queue, messageSelector);
+ result = new QpidRAQueueReceiver(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a queue sender
+ * @param queue The queue
+ * @return The queue sender
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueSender createSender(final Queue queue) throws JMSException
+ {
+ lock();
+ try
+ {
+ QueueSession session = getQueueSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createSender " + Util.asString(session) + " queue=" + queue);
+ }
+
+ QueueSender result = session.createSender(queue);
+ result = new QpidRAQueueSender(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdSender " + Util.asString(session) + " sender=" + result);
+ }
+
+ addProducer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a temporary queue
+ * @return The temporary queue
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTemporaryQueue " + Util.asString(session));
+ }
+
+ TemporaryQueue temp = session.createTemporaryQueue();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdTemporaryQueue " + Util.asString(session) + " temp=" + temp);
+ }
+
+ _sf.addTemporaryQueue(temp);
+
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message consumer
+ * @param destination The destination
+ * @return The message consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageConsumer createConsumer(final Destination destination) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConsumer " + Util.asString(session) + " dest=" + destination);
+ }
+
+ MessageConsumer result = session.createConsumer(destination);
+ result = new QpidRAMessageConsumer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message consumer
+ * @param destination The destination
+ * @param messageSelector The message selector
+ * @return The message consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConsumer " + Util.asString(session) +
+ " dest=" +
+ destination +
+ " messageSelector=" +
+ messageSelector);
+ }
+
+ MessageConsumer result = session.createConsumer(destination, messageSelector);
+ result = new QpidRAMessageConsumer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message consumer
+ * @param destination The destination
+ * @param messageSelector The message selector
+ * @param noLocal If true inhibits the delivery of messages published by its own connection
+ * @return The message consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageConsumer createConsumer(final Destination destination,
+ final String messageSelector,
+ final boolean noLocal) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConsumer " + Util.asString(session) +
+ " dest=" +
+ destination +
+ " messageSelector=" +
+ messageSelector +
+ " noLocal=" +
+ noLocal);
+ }
+
+ MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+ result = new QpidRAMessageConsumer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message producer
+ * @param destination The destination
+ * @return The message producer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageProducer createProducer(final Destination destination) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createProducer " + Util.asString(session) + " dest=" + destination);
+ }
+
+ MessageProducer result = session.createProducer(destination);
+ result = new QpidRAMessageProducer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdProducer " + Util.asString(session) + " producer=" + result);
+ }
+
+ addProducer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the acknowledge mode
+ * @return The mode
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getAcknowledgeMode() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getAcknowledgeMode()");
+ }
+
+ getSessionInternal();
+ return _cri.getAcknowledgeMode();
+ }
+
+ /**
+ * Get the XA resource
+ * @return The XA resource
+ * @exception IllegalStateException If non XA connection
+ */
+ public XAResource getXAResource()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getXAResource()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ return null;
+ }
+
+ try
+ {
+ lock();
+
+ return getXAResourceInternal();
+ }
+ catch (Throwable t)
+ {
+ return null;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the session
+ * @return The session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Session getSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSession()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Non XA connection");
+ }
+
+ lock();
+ try
+ {
+ return this;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the queue session
+ * @return The queue session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueSession getQueueSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getQueueSession()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Non XA connection");
+ }
+
+ lock();
+ try
+ {
+ return this;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the topic session
+ * @return The topic session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSession getTopicSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTopicSession()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Non XA connection");
+ }
+
+ lock();
+ try
+ {
+ return this;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Set the managed connection
+ * @param managedConnection The managed connection
+ */
+ void setManagedConnection(final QpidRAManagedConnection managedConnection)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setManagedConnection(" + managedConnection + ")");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.removeHandle(this);
+ }
+
+ this._mc = managedConnection;
+ }
+
+ /** for tests only */
+ public ManagedConnection getManagedConnection()
+ {
+ return _mc;
+ }
+
+ /**
+ * Destroy
+ */
+ void destroy()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("destroy()");
+ }
+
+ _mc = null;
+ }
+
+ /**
+ * Start
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void start() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start()");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ _started.set(true) ;
+ mc.start();
+ }
+ }
+
+ /**
+ * Stop
+ * @exception JMSException Thrown if an error occurs
+ */
+ void stop() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop()");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.stop();
+ _started.set(false) ;
+ }
+ }
+
+ /**
+ * Check strict
+ * @exception JMSException Thrown if an error occurs
+ */
+ void checkStrict() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("checkStrict()");
+ }
+
+ if (_mc != null)
+ {
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+ }
+
+ /**
+ * Close session
+ * @exception JMSException Thrown if an error occurs
+ */
+ void closeSession() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ _log.trace("Closing session");
+
+ try
+ {
+ mc.stop();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error stopping managed connection", t);
+ }
+
+ synchronized (_consumers)
+ {
+ for (Iterator<MessageConsumer> i = _consumers.iterator(); i.hasNext();)
+ {
+ QpidRAMessageConsumer consumer = (QpidRAMessageConsumer)i.next();
+ try
+ {
+ consumer.closeConsumer();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error closing consumer", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (_producers)
+ {
+ for (Iterator<MessageProducer> i = _producers.iterator(); i.hasNext();)
+ {
+ QpidRAMessageProducer producer = (QpidRAMessageProducer)i.next();
+ try
+ {
+ producer.closeProducer();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error closing producer", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (_browsers)
+ {
+ for (Iterator<QueueBrowser> i = _browsers.iterator(); i.hasNext();)
+ {
+ QpidRAQueueBrowser browser = (QpidRAQueueBrowser)i.next();
+ try
+ {
+ browser.close();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error closing browser", t);
+ }
+ i.remove();
+ }
+ }
+
+ mc.removeHandle(this);
+ ConnectionEvent ev = new ConnectionEvent(mc, ConnectionEvent.CONNECTION_CLOSED);
+ ev.setConnectionHandle(this);
+ mc.sendEvent(ev);
+ this._mc = null;
+ }
+ }
+
+ /**
+ * Add consumer
+ * @param consumer The consumer
+ */
+ void addConsumer(final MessageConsumer consumer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addConsumer(" + consumer + ")");
+ }
+
+ synchronized (_consumers)
+ {
+ _consumers.add(consumer);
+ }
+ }
+
+ /**
+ * Remove consumer
+ * @param consumer The consumer
+ */
+ void removeConsumer(final MessageConsumer consumer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeConsumer(" + consumer + ")");
+ }
+
+ synchronized (_consumers)
+ {
+ _consumers.remove(consumer);
+ }
+ }
+
+ /**
+ * Add producer
+ * @param producer The producer
+ */
+ void addProducer(final MessageProducer producer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addProducer(" + producer + ")");
+ }
+
+ synchronized (_producers)
+ {
+ _producers.add(producer);
+ }
+ }
+
+ /**
+ * Remove producer
+ * @param producer The producer
+ */
+ void removeProducer(final MessageProducer producer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeProducer(" + producer + ")");
+ }
+
+ synchronized (_producers)
+ {
+ _producers.remove(producer);
+ }
+ }
+
+ /**
+ * Add queue browser
+ * @param browser The queue browser
+ */
+ void addQueueBrowser(final QueueBrowser browser)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addQueueBrowser(" + browser + ")");
+ }
+
+ synchronized (_browsers)
+ {
+ _browsers.add(browser);
+ }
+ }
+
+ /**
+ * Remove queue browser
+ * @param browser The queue browser
+ */
+ void removeQueueBrowser(final QueueBrowser browser)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeQueueBrowser(" + browser + ")");
+ }
+
+ synchronized (_browsers)
+ {
+ _browsers.remove(browser);
+ }
+ }
+
+ /**
+ * Get the session and ensure that it is open
+ * @return The session
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ Session getSessionInternal() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc == null)
+ {
+ throw new IllegalStateException("The session is closed");
+ }
+
+ Session session = mc.getSession();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSessionInternal " + Util.asString(session) + " for " + this);
+ }
+
+ return session;
+ }
+
+ /**
+ * Get the XA resource and ensure that it is open
+ * @return The XA Resource
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ XAResource getXAResourceInternal() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc == null)
+ {
+ throw new IllegalStateException("The session is closed");
+ }
+
+ try
+ {
+ XAResource xares = mc.getXAResource();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getXAResourceInternal " + xares + " for " + this);
+ }
+
+ return xares;
+ }
+ catch (ResourceException e)
+ {
+ JMSException jmse = new JMSException("Unable to get XA Resource");
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+
+ /**
+ * Get the queue session
+ * @return The queue session
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ QueueSession getQueueSessionInternal() throws JMSException
+ {
+ Session s = getSessionInternal();
+ if (!(s instanceof QueueSession))
+ {
+ throw new InvalidDestinationException("Attempting to use QueueSession methods on: " + this);
+ }
+ return (QueueSession)s;
+ }
+
+ /**
+ * Get the topic session
+ * @return The topic session
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ TopicSession getTopicSessionInternal() throws JMSException
+ {
+ Session s = getSessionInternal();
+ if (!(s instanceof TopicSession))
+ {
+ throw new InvalidDestinationException("Attempting to use TopicSession methods on: " + this);
+ }
+ return (TopicSession)s;
+ }
+
+ /**
+ * @throws SystemException
+ * @throws RollbackException
+ *
+ */
+ public void checkState() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.checkTransactionActive();
+ }
+ }
+
+ /**
+ * Has this session been started?
+ * @return true if started, false if stopped.
+ */
+ public boolean isStarted()
+ {
+ return _started.get();
+ }
+}