diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 241 |
1 files changed, 152 insertions, 89 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ff7bf4fd26..8e9382a9be 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -73,44 +73,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled - * correctly. Note this only keeps a record of subscriptions which have been created - * in the current instance. It does not remember subscriptions between executions of the - * client + * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = new ConcurrentHashMap<BasicMessageConsumer, String>(); - /** - * Used in the consume method. We generate the consume tag on the client so that we can use the nowait - * feature. - */ + /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */ private int _nextTag = 1; - /** - * This queue is bounded and is used to store messages before being dispatched to the consumer - */ + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final FlowControllingBlockingQueue _queue; private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; - /** - * Set of all producers created by this session - */ + /** Set of all producers created by this session */ private Map _producers = new ConcurrentHashMap(); - /** - * Maps from consumer tag (String) to JMSMessageConsumer instance - */ + /** Maps from consumer tag (String) to JMSMessageConsumer instance */ private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>(); - /** - * Maps from destination to count of JMSMessageConsumers - */ + /** Maps from destination to count of JMSMessageConsumers */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap<Destination, AtomicInteger>(); @@ -127,33 +115,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi protected static final boolean DEFAULT_MANDATORY = true; /** - * The counter of the next producer id. This id is generated by the session and used only to allow the - * producer to identify itself to the session when deregistering itself. - * <p/> - * Access to this id does not require to be synchronized since according to the JMS specification only one - * thread of control is allowed to create producers for any given session instance. + * The counter of the next producer id. This id is generated by the session and used only to allow the producer to + * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be + * synchronized since according to the JMS specification only one thread of control is allowed to create producers + * for any given session instance. */ private long _nextProducerId; /** - * Set when recover is called. This is to handle the case where recover() is called by application code - * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + * Set when recover is called. This is to handle the case where recover() is called by application code during + * onMessage() processing. We need to make sure we do not send an auto ack if recover was called. */ private boolean _inRecovery; private boolean _connectionStopped; private boolean _hasMessageListeners; + private boolean _suspended; + private final Object _suspensionLock = new Object(); - - /** - * Responsible for decoding a message fragment and passing it to the appropriate message consumer. - */ + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ + /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ private final AtomicBoolean _closed = new AtomicBoolean(false); private final Object _lock = new Object(); @@ -276,6 +260,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //fixme awaitTermination } + + public void rollback() + { + + synchronized (_lock) + { + boolean isStopped = connectionStopped(); + + if (!isStopped) + { + setConnectionStopped(true); + } + + _queue.clear(); + + for (BasicMessageConsumer consumer : _consumers.values()) + { + consumer.rollback(); + } + + setConnectionStopped(isStopped); + } + + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -318,7 +326,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - suspendChannel(); + + new Thread(new SuspenderRunner(true)).start(); } } @@ -327,7 +336,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - unsuspendChannel(); + + new Thread(new SuspenderRunner(false)).start(); } } }); @@ -533,18 +543,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rollback() throws JMSException { - checkTransacted(); - try - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); - } - catch (AMQException e) + synchronized (_suspensionLock) { - throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + checkTransacted(); + try + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + suspendChannel(true); + } + + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { + suspendChannel(false); + } + } + catch (AMQException e) + { + throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + } } } @@ -616,9 +647,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public boolean isSuspended() + { + return _suspended; + } + + /** - * Called when the server initiates the closure of the session - * unilaterally. + * Called when the server initiates the closure of the session unilaterally. * * @param e the exception that caused this session to be closed. Null causes the */ @@ -644,10 +680,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after - * failover when the client has veoted resubscription. - * <p/> - * The caller of this method must already hold the failover mutex. + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover + * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. */ void markClosed() { @@ -867,7 +901,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a QueueReceiver * * @param destination + * * @return QueueReceiver - a wrapper around our MessageConsumer + * * @throws JMSException */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException @@ -883,7 +919,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param destination * @param messageSelector + * * @return QueueReceiver - a wrapper around our MessageConsumer + * * @throws JMSException */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException @@ -1146,7 +1184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param amqd * @param protocolHandler + * * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. + * * @throws AMQException */ private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException @@ -1198,6 +1238,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Register to consume from the queue. * * @param queueName + * * @return the consumer tag generated by the broker */ private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler, @@ -1284,7 +1325,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a QueueReceiver wrapping a MessageConsumer * * @param queue + * * @return QueueReceiver + * * @throws JMSException */ public QueueReceiver createReceiver(Queue queue) throws JMSException @@ -1300,7 +1343,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param queue * @param messageSelector + * * @return QueueReceiver + * * @throws JMSException */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException @@ -1347,7 +1392,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a non-durable subscriber * * @param topic + * * @return TopicSubscriber - a wrapper round our MessageConsumer + * * @throws JMSException */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException @@ -1364,7 +1411,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param topic * @param messageSelector * @param noLocal + * * @return TopicSubscriber - a wrapper round our MessageConsumer + * * @throws JMSException */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException @@ -1434,9 +1483,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - */ + /** Note, currently this does not handle reuse of the same name with different topics correctly. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { @@ -1549,8 +1596,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. - * Puts the message onto the queue read by the dispatcher. + * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto + * the queue read by the dispatcher. * * @param message the message that has been received */ @@ -1565,13 +1612,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from - * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is - * AUTO_ACK or similar. + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a + * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar. * * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the - * delivery tag + * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery + * tag */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { @@ -1609,7 +1655,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _channelId; } - void start() + void start() throws AMQException { //fixme This should be controlled by _stopped as it pairs with the stop method //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. @@ -1618,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery - unsuspendChannel(); + suspendChannel(false); } if (hasMessageListeners()) @@ -1651,10 +1697,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - void stop() + void stop() throws AMQException { //stop the server delivering messages to this session - suspendChannel(); + suspendChannel(true); if (_dispatcher != null) { @@ -1666,6 +1712,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Callers must hold the failover mutex before calling this method. * * @param consumer + * * @throws AMQException */ void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException @@ -1691,8 +1738,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Called by the MessageConsumer when closing, to deregister the consumer from the - * map from consumerTag to consumer instance. + * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer + * instance. * * @param consumer the consum */ @@ -1764,28 +1811,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private void suspendChannel() + private void suspendChannel(boolean suspend) throws AMQException { - _logger.warn("Suspending channel"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) - false); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); - } + synchronized (_suspensionLock) + { + _logger.warn("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); - private void unsuspendChannel() - { - _logger.warn("Unsuspending channel"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) - true); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); + _suspended = suspend; + + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + !suspend); // active + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + } } public void confirmConsumerCancelled(String consumerTag) @@ -1829,4 +1871,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + private class SuspenderRunner implements Runnable + { + private boolean _suspend; + + public SuspenderRunner(boolean suspend) + { + _suspend = suspend; + } + + public void run() + { + try + { + suspendChannel(_suspend); + } + catch (AMQException e) + { + _logger.warn("Unable to suspend channel"); + } + } + } } |