summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java241
1 files changed, 152 insertions, 89 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ff7bf4fd26..8e9382a9be 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -73,44 +73,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only
+ * keeps a record of subscriptions which have been created in the current instance. It does not remember
+ * subscriptions between executions of the client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
- /**
- * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
- * feature.
- */
+ /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
private int _nextTag = 1;
- /**
- * This queue is bounded and is used to store messages before being dispatched to the consumer
- */
+ /** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
- /**
- * Set of all producers created by this session
- */
+ /** Set of all producers created by this session */
private Map _producers = new ConcurrentHashMap();
- /**
- * Maps from consumer tag (String) to JMSMessageConsumer instance
- */
+ /** Maps from consumer tag (String) to JMSMessageConsumer instance */
private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
- /**
- * Maps from destination to count of JMSMessageConsumers
- */
+ /** Maps from destination to count of JMSMessageConsumers */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -127,33 +115,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
protected static final boolean DEFAULT_MANDATORY = true;
/**
- * The counter of the next producer id. This id is generated by the session and used only to allow the
- * producer to identify itself to the session when deregistering itself.
- * <p/>
- * Access to this id does not require to be synchronized since according to the JMS specification only one
- * thread of control is allowed to create producers for any given session instance.
+ * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
+ * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
+ * synchronized since according to the JMS specification only one thread of control is allowed to create producers
+ * for any given session instance.
*/
private long _nextProducerId;
/**
- * Set when recover is called. This is to handle the case where recover() is called by application code
- * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ * Set when recover is called. This is to handle the case where recover() is called by application code during
+ * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
private boolean _connectionStopped;
private boolean _hasMessageListeners;
+ private boolean _suspended;
+ private final Object _suspensionLock = new Object();
-
- /**
- * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
- */
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
+ /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
@@ -276,6 +260,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//fixme awaitTermination
}
+
+ public void rollback()
+ {
+
+ synchronized (_lock)
+ {
+ boolean isStopped = connectionStopped();
+
+ if (!isStopped)
+ {
+ setConnectionStopped(true);
+ }
+
+ _queue.clear();
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.rollback();
+ }
+
+ setConnectionStopped(isStopped);
+ }
+
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
@@ -318,7 +326,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
- suspendChannel();
+
+ new Thread(new SuspenderRunner(true)).start();
}
}
@@ -327,7 +336,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
- unsuspendChannel();
+
+ new Thread(new SuspenderRunner(false)).start();
}
}
});
@@ -533,18 +543,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rollback() throws JMSException
{
- checkTransacted();
- try
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class);
- }
- catch (AMQException e)
+ synchronized (_suspensionLock)
{
- throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ checkTransacted();
+ try
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+ suspendChannel(true);
+ }
+
+ _connection.getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class);
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ }
}
}
@@ -616,9 +647,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public boolean isSuspended()
+ {
+ return _suspended;
+ }
+
+
/**
- * Called when the server initiates the closure of the session
- * unilaterally.
+ * Called when the server initiates the closure of the session unilaterally.
*
* @param e the exception that caused this session to be closed. Null causes the
*/
@@ -644,10 +680,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
- * failover when the client has veoted resubscription.
- * <p/>
- * The caller of this method must already hold the failover mutex.
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
+ * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
*/
void markClosed()
{
@@ -867,7 +901,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a QueueReceiver
*
* @param destination
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
@@ -883,7 +919,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param destination
* @param messageSelector
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
@@ -1146,7 +1184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param amqd
* @param protocolHandler
+ *
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+ *
* @throws AMQException
*/
private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
@@ -1198,6 +1238,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Register to consume from the queue.
*
* @param queueName
+ *
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
@@ -1284,7 +1325,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a QueueReceiver wrapping a MessageConsumer
*
* @param queue
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
@@ -1300,7 +1343,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param queue
* @param messageSelector
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
@@ -1347,7 +1392,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a non-durable subscriber
*
* @param topic
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
@@ -1364,7 +1411,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param topic
* @param messageSelector
* @param noLocal
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
@@ -1434,9 +1483,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- */
+ /** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
@@ -1549,8 +1596,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Invoked by the MINA IO thread (indirectly) when a message is received from the transport.
- * Puts the message onto the queue read by the dispatcher.
+ * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+ * the queue read by the dispatcher.
*
* @param message the message that has been received
*/
@@ -1565,13 +1612,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
- * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
- * AUTO_ACK or similar.
+ * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a
+ * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar.
*
* @param deliveryTag the tag of the last message to be acknowledged
- * @param multiple if true will acknowledge all messages up to and including the one specified by the
- * delivery tag
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery
+ * tag
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
@@ -1609,7 +1655,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _channelId;
}
- void start()
+ void start() throws AMQException
{
//fixme This should be controlled by _stopped as it pairs with the stop method
//fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
@@ -1618,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
- unsuspendChannel();
+ suspendChannel(false);
}
if (hasMessageListeners())
@@ -1651,10 +1697,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- void stop()
+ void stop() throws AMQException
{
//stop the server delivering messages to this session
- suspendChannel();
+ suspendChannel(true);
if (_dispatcher != null)
{
@@ -1666,6 +1712,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Callers must hold the failover mutex before calling this method.
*
* @param consumer
+ *
* @throws AMQException
*/
void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException
@@ -1691,8 +1738,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Called by the MessageConsumer when closing, to deregister the consumer from the
- * map from consumerTag to consumer instance.
+ * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
+ * instance.
*
* @param consumer the consum
*/
@@ -1764,28 +1811,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- private void suspendChannel()
+ private void suspendChannel(boolean suspend) throws AMQException
{
- _logger.warn("Suspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- false); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
- }
+ synchronized (_suspensionLock)
+ {
+ _logger.warn("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
- private void unsuspendChannel()
- {
- _logger.warn("Unsuspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- true); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ _suspended = suspend;
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ !suspend); // active
+
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ }
}
public void confirmConsumerCancelled(String consumerTag)
@@ -1829,4 +1871,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ private class SuspenderRunner implements Runnable
+ {
+ private boolean _suspend;
+
+ public SuspenderRunner(boolean suspend)
+ {
+ _suspend = suspend;
+ }
+
+ public void run()
+ {
+ try
+ {
+ suspendChannel(_suspend);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to suspend channel");
+ }
+ }
+ }
}