diff options
author | Rupert Smith <rupertlssmith@apache.org> | 2007-06-01 14:33:07 +0000 |
---|---|---|
committer | Rupert Smith <rupertlssmith@apache.org> | 2007-06-01 14:33:07 +0000 |
commit | 3b5d4734b777b54b52ce2710f404143aca8c5c6e (patch) | |
tree | d436e7a5239ec6be725852c12e7ccae975892745 | |
parent | 566e08caa331629a15bedca1d8cfc896886b0497 (diff) | |
download | qpid-python-3b5d4734b777b54b52ce2710f404143aca8c5c6e.tar.gz |
QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@543496 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 3171 insertions, 2480 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 347f5728e2..674f205af6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,34 +20,15 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; @@ -67,6 +48,27 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -96,8 +98,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>(); - + private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>(); private String _clientName; @@ -486,72 +487,72 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect final int prefetchHigh, final int prefetchLow) throws JMSException { checkNotClosed(); + if (channelLimitReached()) { throw new ChannelLimitReachedException(_maximumChannelCount); } - else - { - return (org.apache.qpid.jms.Session) new FailoverSupport() + + return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( + new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() + { + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - public Object operation() throws JMSException + int channelId = _idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) { - int channelId = _idFactory.incrementAndGet(); + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); - if (_logger.isDebugEnabled()) + boolean success = false; + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) { - _logger.debug("Write channel open frame for channel id " + channelId); + deregisterSession(channelId); } + } - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - //_protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; + if (_started) + { try { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; + session.start(); } catch (AMQException e) { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - deregisterSession(channelId); - } + throw new JMSAMQException(e); } - - if (_started) - { - try - { - session.start(); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } - } - - return session; } - }.execute(this); - } + + return session; + } + }, this).execute(); } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException + throws AMQException, FailoverException { // TODO: Be aware of possible changes to parameter order as versions change. @@ -581,7 +582,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + throws AMQException, FailoverException { try { @@ -1128,14 +1130,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. */ - public void resubscribeSessions() throws JMSException, AMQException + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { ArrayList sessions = new ArrayList(_sessions.values()); _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); - //_protocolHandler.addSessionByChannel(s.getChannelId(), s); + // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); } @@ -1223,7 +1225,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _taskPool.execute(task); } - public AMQSession getSession(int channelId) { return _sessions.get(channelId); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b7615c5b7b..25c2d94377 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,47 +20,16 @@ */ package org.apache.qpid.client; -import java.io.Serializable; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.Arrays; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.message.JMSMapMessage; @@ -70,21 +39,20 @@ import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AccessRequestBody; -import org.apache.qpid.framing.AccessRequestOkBody; import org.apache.qpid.framing.BasicAckBody; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.ExchangeDeclareBody; @@ -92,358 +60,242 @@ import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import java.io.Serializable; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(AMQSession.class); + /** Used for debugging in the dispatcher. */ + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + + /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; + + /** The default minimum number of prefetched messages at which to resume the channel. */ public static final int DEFAULT_PREFETCH_LOW_MARK = 2500; + /** + * The default value for immediate flag used by producers created by this session is false. That is, a consumer does + * not need to be attached to a queue. + */ + protected static final boolean DEFAULT_IMMEDIATE = false; + + /** + * The default value for mandatory flag used by producers created by this session is true. That is, server will not + * silently drop messages where no queue is connected to the exchange for the message. + */ + protected static final boolean DEFAULT_MANDATORY = true; + + /** System property to enable strict AMQP compliance. */ + public static final String STRICT_AMQP = "STRICT_AMQP"; + + /** Strict AMQP default setting. */ + public static final String STRICT_AMQP_DEFAULT = "false"; + + /** System property to enable failure if strict AMQP compliance is violated. */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; + + /** Strickt AMQP failure default. */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; + + /** System property to enable immediate message prefetching. */ + public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + + /** Immediate message prefetch default. */ + public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + + /** The connection to which this session belongs. */ private AMQConnection _connection; + /** Used to indicate whether or not this is a transactional session. */ private boolean _transacted; + /** Holds the sessions acknowledgement mode. */ private int _acknowledgeMode; + /** Holds this session unique identifier, used to distinguish it from other sessions. */ private int _channelId; + /** @todo This does not appear to be set? */ private int _ticket; + /** Holds the high mark for prefetched message, at which the session is suspended. */ private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + + /** Holds the low mark for prefetched messages, below which the session is resumed. */ private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + /** Holds the message listener, if any, which is attached to this session. */ private MessageListener _messageListener = null; + /** Used to indicate that this session has been started at least once. */ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only - * keeps a record of subscriptions which have been created in the current instance. It does not remember - * subscriptions between executions of the client + * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client. */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); - private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); - /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */ - private int _nextTag = 1; + /** + * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked + * up in the {@link #_subscriptions} map. + */ + private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = + new ConcurrentHashMap<BasicMessageConsumer, String>(); - /** This queue is bounded and is used to store messages before being dispatched to the consumer */ + /** + * Used to hold incoming messages. + * + * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. + */ private final FlowControllingBlockingQueue _queue; + /** Holds the dispatcher thread for this session. */ private Dispatcher _dispatcher; + /** Holds the message factory factory for this session. */ private MessageFactoryRegistry _messageFactoryRegistry; - /** Set of all producers created by this session */ - private Map _producers = new ConcurrentHashMap(); - - /** Maps from consumer tag (String) to JMSMessageConsumer instance */ - private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); - - /** Maps from destination to count of JMSMessageConsumers */ - private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = - new ConcurrentHashMap<Destination, AtomicInteger>(); + /** Holds all of the producers created by this session, keyed by their unique identifiers. */ + private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); /** - * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not - * need to be attached to a queue + * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. */ - protected static final boolean DEFAULT_IMMEDIATE = false; + private int _nextTag = 1; /** - * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently - * drop messages where no queue is connected to the exchange for the message + * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right + * consumer. */ - protected static final boolean DEFAULT_MANDATORY = true; + private Map<AMQShortString, BasicMessageConsumer> _consumers = + new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + + /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ + private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = + new ConcurrentHashMap<Destination, AtomicInteger>(); /** - * The counter of the next producer id. This id is generated by the session and used only to allow the producer to - * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be - * synchronized since according to the JMS specification only one thread of control is allowed to create producers - * for any given session instance. + * Used as a source of unique identifiers for producers within the session. + * + * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one + * thread of control is allowed to create producers for any given session instance. */ private long _nextProducerId; - /** * Set when recover is called. This is to handle the case where recover() is called by application code during - * onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + * onMessage() processing to enure that an auto ack is not sent. */ private boolean _inRecovery; + /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; + /** Used to indicate that this session has a message listener attached to it. */ private boolean _hasMessageListeners; + /** Used to indicate that this session has been suspended. */ private boolean _suspended; + /** + * Used to protect the suspension of this session, so that critical code can be executed during suspension, + * without the session being resumed by other threads. + */ private final Object _suspensionLock = new Object(); - /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */ + /** + * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel. + * + * @todo This is accessed only within a synchronized method, so does not need to be atomic. + */ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); - /** System property to enable strickt AMQP compliance */ - public static final String STRICT_AMQP = "STRICT_AMQP"; - /** Strickt AMQP default */ - public static final String STRICT_AMQP_DEFAULT = "false"; + /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ + private final boolean _immediatePrefetch; + /** Indicates that warnings should be generated on violations of the strict AMQP. */ private final boolean _strictAMQP; - /** System property to enable strickt AMQP compliance */ - public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - /** Strickt AMQP default */ - public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - + /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ private final boolean _strictAMQPFATAL; - /** System property to enable immediate message prefetching */ - public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; - /** Immediate message prefetch default */ - public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - - private final boolean _immediatePrefetch; - - private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); - - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - private class Dispatcher extends Thread - { - - /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ - private final AtomicBoolean _closed = new AtomicBoolean(false); - - private final Object _lock = new Object(); - - public Dispatcher() - { - super("Dispatcher-Channel-" + _channelId); - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " created"); - } - } - - public void run() - { - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " started"); - } - - UnprocessedMessage message; - - // Allow disptacher to start stopped - synchronized (_lock) - { - while (connectionStopped()) - { - try - { - _lock.wait(); - } - catch (InterruptedException e) - { - // ignore - } - } - } - - try - { - while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) - { - synchronized (_lock) - { - - while (connectionStopped()) - { - _lock.wait(); - } - - dispatchMessage(message); - - while (connectionStopped()) - { - _lock.wait(); - } - - } - - } - } - catch (InterruptedException e) - { - //ignore - } - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); - } - } - - // only call while holding lock - final boolean connectionStopped() - { - return _connectionStopped; - } - - boolean setConnectionStopped(boolean connectionStopped) - { - boolean currently; - synchronized (_lock) - { - currently = _connectionStopped; - _connectionStopped = connectionStopped; - _lock.notify(); - - if (_dispatcherLogger.isDebugEnabled()) - { - _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + - ": Currently " + (currently ? "Stopped" : "Started")); - } - } - return currently; - } - - private void dispatchMessage(UnprocessedMessage message) - { - if (message.getDeliverBody() != null) - { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); - - if (consumer == null || consumer.isClosed()) - { - if (_dispatcherLogger.isInfoEnabled()) - { - if (consumer == null) - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " + - message.getDeliverBody().consumerTag + - " )without a handler - rejecting(requeue)..."); - } - else - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " + - " consumer(" + consumer.debugIdentity() + - ") is closed rejecting(requeue)..."); - } - } - // Don't reject if we're already closing - if (!_closed.get()) - { - rejectMessage(message, true); - } - } - else - { - consumer.notifyMessage(message, _channelId); - } - } - } - - public void close() - { - _closed.set(true); - interrupt(); - - //fixme awaitTermination - - } - - public void rollback() - { - - synchronized (_lock) - { - boolean isStopped = connectionStopped(); - - if (!isStopped) - { - setConnectionStopped(true); - } - - rejectAllMessages(true); - - _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); - - for (BasicMessageConsumer consumer : _consumers.values()) - { - if (!consumer.isNoConsume()) - { - consumer.rollback(); - } - else - { - // should perhaps clear the _SQ here. - //consumer._synchronousQueue.clear(); - consumer.clearReceiveQueue(); - } - - - } - - setConnectionStopped(isStopped); - } - - } - - public void rejectPending(BasicMessageConsumer consumer) - { - synchronized (_lock) - { - boolean stopped = _dispatcher.connectionStopped(); - - if (!stopped) - { - _dispatcher.setConnectionStopped(true); - } - - // Reject messages on pre-receive queue - consumer.rollback(); - - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - - // closeConsumer - consumer.markClosed(); - - _dispatcher.setConnectionStopped(stopped); - - } - } - } - - - + /** + * Creates a new session on a connection. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param messageFactoryRegistry The message factory factory for the session. + * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. + */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); - _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); - _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); + _strictAMQPFATAL = + Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); + _immediatePrefetch = + _strictAMQP + || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); _connection = con; _transacted = transacted; @@ -455,6 +307,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _acknowledgeMode = acknowledgeMode; } + _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; _defaultPrefetchHighMark = defaultPrefetchHighMark; @@ -462,27 +315,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, - new FlowControllingBlockingQueue.ThresholdListener() - { - public void aboveThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(true)).start(); - } - } - - public void underThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(false)).start(); - } - } - }); + _queue = + new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue.ThresholdListener() + { + public void aboveThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(true)).start(); + } + } + + public void underThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(false)).start(); + } + } + }); } else { @@ -490,183 +348,146 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); - } - - public AMQConnection getAMQConnection() - { - return _connection; - } - - public BytesMessage createBytesMessage() throws JMSException + /** + * Creates a new session on a connection with the default message factory factory. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + */ + AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, + int defaultPrefetchLow) { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - return new JMSBytesMessage(); - } + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, + defaultPrefetchLow); } - public MapMessage createMapMessage() throws JMSException + /** + * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. + * + * @throws IllegalStateException If the session is closed. + */ + public void acknowledge() throws IllegalStateException { - synchronized (_connection.getFailoverMutex()) + if (isClosed()) { - checkNotClosed(); - return new JMSMapMessage(); + throw new IllegalStateException("Session is already closed"); } - } - - public javax.jms.Message createMessage() throws JMSException - { - return createBytesMessage(); - } - public ObjectMessage createObjectMessage() throws JMSException - { - synchronized (_connection.getFailoverMutex()) + for (BasicMessageConsumer consumer : _consumers.values()) { - checkNotClosed(); - return (ObjectMessage) new JMSObjectMessage(); + consumer.acknowledge(); } } - public ObjectMessage createObjectMessage(Serializable object) throws JMSException - { - ObjectMessage msg = createObjectMessage(); - msg.setObject(object); - return msg; - } - - public StreamMessage createStreamMessage() throws JMSException + /** + * Acknowledge one or many messages. + * + * @param deliveryTag The tag of the last message to be acknowledged. + * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the + * delivery tag, <tt>false</tt> to just acknowledge that message. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void acknowledgeMessage(long deliveryTag, boolean multiple) { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); + final AMQFrame ackFrame = + BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + multiple); - return new JMSStreamMessage(); - } - } - - public TextMessage createTextMessage() throws JMSException - { - synchronized (_connection.getFailoverMutex()) + if (_logger.isDebugEnabled()) { - checkNotClosed(); - - return new JMSTextMessage(); + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - } - public TextMessage createTextMessage(String text) throws JMSException - { - - TextMessage msg = createTextMessage(); - msg.setText(text); - return msg; - } - - public boolean getTransacted() throws JMSException - { - checkNotClosed(); - return _transacted; - } - - public int getAcknowledgeMode() throws JMSException - { - checkNotClosed(); - return _acknowledgeMode; - } - - public void commit() throws JMSException - { - checkTransacted(); - try - { - // Acknowledge up to message last delivered (if any) for each consumer. - //need to send ack for messages delivered to consumers so far - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - //Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); - } - - // Commits outstanding messages sent and outstanding acknowledgements. - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQProtocolHandler handler = getProtocolHandler(); - - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion()), - TxCommitOkBody.class); - } - catch (AMQException e) - { - JMSException exception = new JMSException("Failed to commit: " + e.getMessage()); - exception.setLinkedException(e); - throw exception; - } + getProtocolHandler().writeFrame(ackFrame); } - - public void rollback() throws JMSException + /** + * Binds the named queue, with the specified routing key, to the named exchange. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param queueName The name of the queue to bind. + * @param routingKey The routing key to bind the queue with. + * @param arguments Additional arguments. + * @param exchangeName The exchange to bind the queue on. + * + * @throws AMQException If the queue cannot be bound for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + * + * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? + */ + public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName) throws AMQException { - synchronized (_suspensionLock) - { - checkTransacted(); - try + /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - - boolean isSuspended = isSuspended(); - - if (!isSuspended) - { - suspendChannel(true); - } - - if (_dispatcher != null) + public Object execute() throws AMQException, FailoverException { - _dispatcher.rollback(); + AMQFrame queueBind = + QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + + return null; } - - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - - - if (!isSuspended) - { - suspendChannel(false); - } - } - catch (AMQException e) - { - throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); - } - } + }, _connection).execute(); } + /** + * Closes the session with no timeout. + * + * @throws JMSException If the JMS provider fails to close the session due to some internal error. + */ public void close() throws JMSException { close(-1); } + /** + * Closes the session. + * + * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + * the channel. This is because the channel is marked as closed before the request to close it is made, so the + * fail-over should not re-open it. + * + * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. + * + * @throws JMSException If the JMS provider fails to close the session due to some internal error. + * + * @todo Be aware of possible changes to parameter order as versions change. + * + * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be + * re-opened. May need to examine this more carefully. + * + * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, + * because the failover process sends the failover event before acquiring the mutex itself. + */ public void close(long timeout) throws JMSException { if (_logger.isInfoEnabled()) { - _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.info("Closing session: " + this + ":" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session + // that can be called from a different thread of control from the one controlling the session. synchronized (_connection.getFailoverMutex()) { - //Ensure we only try and close an open session. + // Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { // we pass null since this is not an error case @@ -676,18 +497,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { getProtocolHandler().closeSession(this); - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + + final AMQFrame frame = + ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. } catch (AMQException e) { @@ -695,6 +516,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi jmse.setLinkedException(e); throw jmse; } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } finally { _connection.deregisterSession(_channelId); @@ -703,65 +531,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private AMQProtocolHandler getProtocolHandler() - { - return _connection.getProtocolHandler(); - } - - - private byte getProtocolMinorVersion() - { - return getProtocolHandler().getProtocolMinorVersion(); - } - - private byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } - - - /** - * Close all producers or consumers. This is called either in the error case or when closing the session normally. - * - * @param amqe the exception, may be null to indicate no error has occurred - */ - private void closeProducersAndConsumers(AMQException amqe) throws JMSException - { - JMSException jmse = null; - try - { - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - jmse = e; - } - try - { - closeConsumers(amqe); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - if (jmse == null) - { - jmse = e; - } - } - if (jmse != null) - { - throw jmse; - } - } - - - public boolean isSuspended() - { - return _suspended; - } - - /** * Called when the server initiates the closure of the session unilaterally. * @@ -783,738 +552,342 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { amqe = new AMQException("Closing session forcibly", e); } + _connection.deregisterSession(_channelId); closeProducersAndConsumers(amqe); } } /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover - * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. + * Commits all messages done in this transaction and releases any locks currently held. + * + * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the + * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the commit actually happened on the broker in this case. + * + * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does + * not mean that the commit is known to have failed, merely that it is not known whether it + * failed or not. + * + * @todo Be aware of possible changes to parameter order as versions change. */ - void markClosed() + public void commit() throws JMSException { - _closed.set(true); - _connection.deregisterSession(_channelId); - markClosedProducersAndConsumers(); - - } + checkTransacted(); - private void markClosedProducersAndConsumers() - { - try - { - // no need for a markClosed* method in this case since there is no protocol traffic closing a producer - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } try { - markClosedConsumers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - } + // Acknowledge up to message last delivered (if any) for each consumer. + // need to send ack for messages delivered to consumers so far + for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + { + // Sends acknowledgement to server + i.next().acknowledgeLastDelivered(); + } - /** - * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is - * currently no way of propagating errors to message producers (this is a JMS limitation). - */ - private void closeProducers() throws JMSException - { - // we need to clone the list of producers since the close() method updates the _producers collection - // which would result in a concurrent modification exception - final ArrayList clonedProducers = new ArrayList(_producers.values()); + // Commits outstanding messages sent and outstanding acknowledgements. + final AMQProtocolHandler handler = getProtocolHandler(); - final Iterator it = clonedProducers.iterator(); - while (it.hasNext()) - { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); - prod.close(); + handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), + TxCommitOkBody.class); } - // at this point the _producers map is empty - } - - /** - * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. - * - * @param error not null if this is a result of an error occurring at the connection level - */ - private void closeConsumers(Throwable error) throws JMSException - { - if (_dispatcher != null) + catch (AMQException e) { - _dispatcher.close(); - _dispatcher = null; + throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values()); - - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); - while (it.hasNext()) + catch (FailoverException e) { - final BasicMessageConsumer con = it.next(); - if (error != null) - { - con.notifyError(error); - } - else - { - con.close(); - } + throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } - // at this point the _consumers map will be empty } - private void markClosedConsumers() throws JMSException + public void confirmConsumerCancelled(AMQShortString consumerTag) { - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); - while (it.hasNext()) - { - final BasicMessageConsumer con = it.next(); - con.markClosed(); - } - // at this point the _consumers map will be empty - } - - /** - * Asks the broker to resend all unacknowledged messages for the session. - * - * @throws JMSException - */ - public void recover() throws JMSException - { - checkNotClosed(); - checkNotTransacted(); // throws IllegalStateException if a transacted session - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; - try + // Remove the consumer from the map + BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + if (consumer != null) { - - boolean isSuspended = isSuspended(); - - if (!isSuspended) + // fixme this isn't right.. needs to check if _queue contains data for this consumer + if (consumer.isAutoClose()) // && _queue.isEmpty()) { - suspendChannel(true); + consumer.closeWhenNoMessages(true); } - for (BasicMessageConsumer consumer : _consumers.values()) + if (!consumer.isNoConsume()) { - consumer.clearUnackedMessages(); - } + // Clean the Maps up first + // Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } + else + { + _logger.info("Dispatcher is null so created stopped dispatcher"); - if (_dispatcher != null) - { - _dispatcher.rollback(); - } + startDistpatcherIfNecessary(true); + } - if (isStrictAMQP()) - { - // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false)); // requeue - _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); + _dispatcher.rejectPending(consumer); } else { + // Just close the consumer + // fixme the CancelOK is being processed before the arriving messages.. + // The dispatcher is still to process them so the server sent in order but the client + // has yet to receive before the close comes in. - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false) // requeue - , BasicRecoverOkBody.class); - } - if (!isSuspended) - { - suspendChannel(false); + // consumer.markClosed(); } } - catch (AMQException e) + else { - throw new JMSAMQException(e); + _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); } - } - - boolean isInRecovery() - { - return _inRecovery; - } - void setInRecovery(boolean inRecovery) - { - _inRecovery = inRecovery; } - public void acknowledge() throws JMSException + public QueueBrowser createBrowser(Queue queue) throws JMSException { - if (isClosed()) - { - throw new IllegalStateException("Session is already closed"); - } - for (BasicMessageConsumer consumer : _consumers.values()) + if (isStrictAMQP()) { - consumer.acknowledge(); + throw new UnsupportedOperationException(); } - - } - - - public MessageListener getMessageListener() throws JMSException - { -// checkNotClosed(); - return _messageListener; - } - - public void setMessageListener(MessageListener listener) throws JMSException - { -// checkNotClosed(); -// -// if (_dispatcher != null && !_dispatcher.connectionStopped()) -// { -// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); -// } -// -// // We are stopped -// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) -// { -// BasicMessageConsumer consumer = i.next(); -// -// if (consumer.isReceiving()) -// { -// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); -// } -// } -// -// _messageListener = listener; -// -// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) -// { -// i.next().setMessageListener(_messageListener); -// } - - } - - public void run() - { - throw new java.lang.UnsupportedOperationException(); + return createBrowser(queue, null); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, - boolean immediate, boolean waitUntilSent) - throws JMSException + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - return createProducerImpl(destination, mandatory, immediate, waitUntilSent); - } + if (isStrictAMQP()) + { + throw new UnsupportedOperationException(); + } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate); - } + checkNotClosed(); + checkValidQueue(queue); - public BasicMessageProducer createProducer(Destination destination, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); + return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } - public BasicMessageProducer createProducer(Destination destination) throws JMSException + public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException { - return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); - } + checkValidDestination(destination); - private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, - boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate, false); + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + messageSelector, null, true, true); } - private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) - throws JMSException + public BytesMessage createBytesMessage() throws JMSException { - return (BasicMessageProducer) new FailoverSupport() + synchronized (_connection.getFailoverMutex()) { - public Object operation() throws JMSException - { - checkNotClosed(); - long producerId = getNextProducerId(); - BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, getProtocolHandler(), - producerId, immediate, mandatory, waitUntilSent); - registerProducer(producerId, producer); - return producer; - } - }.execute(_connection); - } - - /** - * Creates a QueueReceiver - * - * @param destination - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination) throws JMSException - { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); - return new QueueReceiverAdaptor(dest, consumer); - } + checkNotClosed(); - /** - * Creates a QueueReceiver using a message selector - * - * @param destination - * @param messageSelector - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException - { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(destination, messageSelector); - return new QueueReceiverAdaptor(dest, consumer); + return new JMSBytesMessage(); + } } public MessageConsumer createConsumer(Destination destination) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - false, - false, - null, - null, - false, - false); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null, + false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - false, - false, - messageSelector, - null, - false, - false); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, + messageSelector, null, false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - noLocal, - false, - messageSelector, - null, - false, - false); - } - - public MessageConsumer createBrowserConsumer(Destination destination, - String messageSelector, - boolean noLocal) - throws JMSException + throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - noLocal, - false, - messageSelector, - null, - true, - true); - } - - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + messageSelector, null, false, false); + } + + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, + String selector) throws JMSException { checkValidDestination(destination); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); } - - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, + boolean exclusive, String selector) throws JMSException { checkValidDestination(destination); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); } - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, + String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector, false, false); + + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false); } - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, + boolean exclusive, String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector, false, false); + + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false, + false); } - protected MessageConsumer createConsumerImpl(final Destination destination, - final int prefetchHigh, - final int prefetchLow, - final boolean noLocal, - final boolean exclusive, - String selector, - final FieldTable rawSelector, - final boolean noConsume, - final boolean autoClose) throws JMSException + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { - checkTemporaryDestination(destination); - final String messageSelector; - - if (_strictAMQP && !(selector == null || selector.equals(""))) + checkNotClosed(); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) { - if (_strictAMQPFATAL) + if (subscriber.getTopic().equals(topic)) { - throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); } else { - messageSelector = null; + unsubscribe(name); } } else { - messageSelector = selector; - } - - return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() - { - public Object operation() throws JMSException + AMQShortString topicName; + if (topic instanceof AMQTopic) { - checkNotClosed(); - - AMQDestination amqd = (AMQDestination) destination; + topicName = ((AMQTopic) topic).getDestinationName(); + } + else + { + topicName = new AMQShortString(topic.getTopicName()); + } - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - // TODO: Define selectors in AMQP - // TODO: construct the rawSelector from the selector string if rawSelector == null - final FieldTable ft = FieldTableFactory.newFieldTable(); - //if (rawSelector != null) - // ft.put("headers", rawSelector.getDataAsBytes()); - if (rawSelector != null) + if (_strictAMQP) + { + if (_strictAMQPFATAL) { - ft.addAll(rawSelector); + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); } - - BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, - _messageFactoryRegistry, AMQSession.this, - protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode, noConsume, autoClose); - - if (_messageListener != null) + else { - consumer.setMessageListener(_messageListener); + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); } - try - { - registerConsumer(consumer, false); - } - catch (AMQInvalidArgumentException ise) - { - JMSException ex = new InvalidSelectorException(ise.getMessage()); - ex.setLinkedException(ise); - throw ex; - } - catch (AMQInvalidRoutingKeyException e) + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) { - JMSException ide = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); - ide.setLinkedException(e); - throw ide; + deleteQueue(dest.getAMQQueueName()); } - catch (AMQException e) - { - JMSException ex = new JMSException("Error registering consumer: " + e); + } + } - if (_logger.isDebugEnabled()) - { - e.printStackTrace(); - } - ex.setLinkedException(e); - throw ex; - } + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - synchronized (destination) - { - _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); - _destinationConsumerCount.get(destination).incrementAndGet(); - } + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return consumer; - } - }.execute(_connection); + return subscriber; } - private void checkTemporaryDestination(Destination destination) - throws JMSException + /** Note, currently this does not handle reuse of the same name with different topics correctly. */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException { - if ((destination instanceof TemporaryDestination)) - { - _logger.debug("destination is temporary"); - final TemporaryDestination tempDest = (TemporaryDestination) destination; - if (tempDest.getSession() != this) - { - _logger.debug("destination is on different session"); - throw new JMSException("Cannot consume from a temporary destination created onanother session"); - } - if (tempDest.isDeleted()) - { - _logger.debug("destination is deleted"); - throw new JMSException("Cannot consume from a deleted destination"); - } - } - } + checkNotClosed(); + checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + return subscriber; + } - public boolean hasConsumer(Destination destination) + public MapMessage createMapMessage() throws JMSException { - AtomicInteger counter = _destinationConsumerCount.get(destination); + synchronized (_connection.getFailoverMutex()) + { + checkNotClosed(); - return (counter != null) && (counter.get() != 0); + return new JMSMapMessage(); + } } - public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException + public javax.jms.Message createMessage() throws JMSException { - declareExchange(name, type, getProtocolHandler(), nowait); + return createBytesMessage(); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + public ObjectMessage createObjectMessage() throws JMSException { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); + synchronized (_connection.getFailoverMutex()) + { + checkNotClosed(); + + return (ObjectMessage) new JMSObjectMessage(); + } } - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - nowait, // nowait - false, // passive - getTicket(), // ticket - type); // type + ObjectMessage msg = createObjectMessage(); + msg.setObject(object); - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + return msg; } - - public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException + public BasicMessageProducer createProducer(Destination destination) throws JMSException { - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); - + return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - - public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException + public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket - - - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - /** - * Declare the queue. - * - * @param amqd - * @param protocolHandler - * - * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. - * - * @throws AMQException - */ - private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + throws JMSException { - // For queues (but not topics) we generate the name in the client rather than the - // server. This allows the name to be reused on failover if required. In general, - // the destination indicates whether it wants a name generated or not. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - //TODO verify the destiation is valid. else throw - - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket - - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); - return amqd.getAMQQueueName(); + return createProducerImpl(destination, mandatory, immediate); } - private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, + boolean waitUntilSent) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket - - - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + return createProducerImpl(destination, mandatory, immediate, waitUntilSent); } - /** - * Register to consume from the queue. - * - * @param queueName - * - * @return the consumer tag generated by the broker - */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector) throws AMQException + public TopicPublisher createPublisher(Topic topic) throws JMSException { - //need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); - - FieldTable arguments = FieldTableFactory.newFieldTable(); - if (messageSelector != null && !messageSelector.equals("")) - { - arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); - } - if (consumer.isAutoClose()) - { - arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); - } - if (consumer.isNoConsume()) - { - arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); - } - - consumer.setConsumerTag(tag); - // we must register the consumer in the map before we actually start listening - _consumers.put(tag, consumer); + checkNotClosed(); - try - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket - if (nowait) - { - protocolHandler.writeFrame(jmsConsume); - } - else - { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); - } - } - catch (AMQException e) - { - // clean-up the map in the event of an error - _consumers.remove(tag); - throw e; - } + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1540,9 +913,80 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public AMQShortString getDefaultQueueExchangeName() + /** + * Declares the named queue. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the queue to declare. + * @param autoDelete + * @param durable Flag to indicate that the queue is durable. + * @param exclusive Flag to indicate that the queue is exclusive to this client. + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive) throws AMQException { - return _connection.getDefaultQueueExchangeName(); + new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame queueDeclare = + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return null; + } + }, _connection).execute(); + } + + /** + * Creates a QueueReceiver + * + * @param destination + * + * @return QueueReceiver - a wrapper around our MessageConsumer + * + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination) throws JMSException + { + checkValidDestination(destination); + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + + return new QueueReceiverAdaptor(dest, consumer); + } + + /** + * Creates a QueueReceiver using a message selector + * + * @param destination + * @param messageSelector + * + * @return QueueReceiver - a wrapper around our MessageConsumer + * + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException + { + checkValidDestination(destination); + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); + + return new QueueReceiverAdaptor(dest, consumer); } /** @@ -1559,6 +1003,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + return new QueueReceiverAdaptor(dest, consumer); } @@ -1576,47 +1021,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(dest, messageSelector); + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); + return new QueueReceiverAdaptor(dest, consumer); } public QueueSender createSender(Queue queue) throws JMSException { checkNotClosed(); - //return (QueueSender) createProducer(queue); + + // return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } - public Topic createTopic(String topicName) throws JMSException + public StreamMessage createStreamMessage() throws JMSException { - checkNotClosed(); - - if (topicName.indexOf('/') == -1) - { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); - } - else + synchronized (_connection.getFailoverMutex()) { - try - { - return new AMQTopic(new AMQBindingURL(topicName)); - } - catch (URLSyntaxException urlse) - { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); + checkNotClosed(); - throw jmse; - } + return new JMSStreamMessage(); } } - public AMQShortString getDefaultTopicExchangeName() - { - return _connection.getDefaultTopicExchangeName(); - } - /** * Creates a non-durable subscriber * @@ -1630,7 +1057,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); AMQTopic dest = checkValidTopic(topic); - //AMQTopic dest = new AMQTopic(topic.getTopicName()); + + // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1649,150 +1077,401 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); AMQTopic dest = checkValidTopic(topic); - //AMQTopic dest = new AMQTopic(topic.getTopicName()); + + // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + public TemporaryQueue createTemporaryQueue() throws JMSException { + checkNotClosed(); + return new AMQTemporaryQueue(this); + } + public TemporaryTopic createTemporaryTopic() throws JMSException + { checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); - if (subscriber != null) + + return new AMQTemporaryTopic(this); + } + + public TextMessage createTextMessage() throws JMSException + { + synchronized (_connection.getFailoverMutex()) { - if (subscriber.getTopic().equals(topic)) + checkNotClosed(); + + return new JMSTextMessage(); + } + } + + public TextMessage createTextMessage(String text) throws JMSException + { + + TextMessage msg = createTextMessage(); + msg.setText(text); + + return msg; + } + + public Topic createTopic(String topicName) throws JMSException + { + checkNotClosed(); + + if (topicName.indexOf('/') == -1) + { + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + } + else + { + try { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + - name); + return new AMQTopic(new AMQBindingURL(topicName)); } - else + catch (URLSyntaxException urlse) { - unsubscribe(name); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + + throw jmse; } } + } + + public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException + { + declareExchange(name, type, getProtocolHandler(), nowait); + } + + public int getAcknowledgeMode() throws JMSException + { + checkNotClosed(); + + return _acknowledgeMode; + } + + public AMQConnection getAMQConnection() + { + return _connection; + } + + public int getChannelId() + { + return _channelId; + } + + public int getDefaultPrefetch() + { + return _defaultPrefetchHighMark; + } + + public int getDefaultPrefetchHigh() + { + return _defaultPrefetchHighMark; + } + + public int getDefaultPrefetchLow() + { + return _defaultPrefetchLowMark; + } + + public AMQShortString getDefaultQueueExchangeName() + { + return _connection.getDefaultQueueExchangeName(); + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _connection.getDefaultTopicExchangeName(); + } + + public MessageListener getMessageListener() throws JMSException + { + // checkNotClosed(); + return _messageListener; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _connection.getTemporaryQueueExchangeName(); + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _connection.getTemporaryTopicExchangeName(); + } + + public int getTicket() + { + return _ticket; + } + + public boolean getTransacted() throws JMSException + { + checkNotClosed(); + + return _transacted; + } + + public boolean hasConsumer(Destination destination) + { + AtomicInteger counter = _destinationConsumerCount.get(destination); + + return (counter != null) && (counter.get() != 0); + } + + public boolean isStrictAMQP() + { + return _strictAMQP; + } + + public boolean isSuspended() + { + return _suspended; + } + + /** + * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto + * the queue read by the dispatcher. + * + * @param message the message that has been received + */ + public void messageReceived(UnprocessedMessage message) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Message[" + + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) + + "] received in session with channel id " + _channelId); + } + + if (message.getDeliverBody() == null) + { + // Return of the bounced message. + returnBouncedMessage(message); + } else { - AMQShortString topicName; - if (topic instanceof AMQTopic) + _queue.add(message); + } + } + + /** + * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. + * + * <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all + * messages that have been delivered to the client. + * + * <p/>Restarting a session causes it to take the following actions: + * + * <ul> + * <li>Stop message delivery.</li> + * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". + * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. + * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> + * </ul> + * + * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and + * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible + * for the client to determine whether the broker is going to recover the session or not. + * + * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. + * Not that this does not necessarily mean that the recovery has failed, but simply that it + * is not possible to tell if it has or not. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void recover() throws JMSException + { + // Ensure that the session is open. + checkNotClosed(); + + // Ensure that the session is not transacted. + checkNotTransacted(); + + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; + try + { + + boolean isSuspended = isSuspended(); + + if (!isSuspended) { - topicName = ((AMQTopic) topic).getDestinationName(); + suspendChannel(true); } - else + + for (BasicMessageConsumer consumer : _consumers.values()) { - topicName = new AMQShortString(topic.getTopicName()); + consumer.clearUnackedMessages(); } - if (_strictAMQP) + if (_dispatcher != null) { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } + _dispatcher.rollback(); + } - deleteQueue(dest.getAMQQueueName()); + if (isStrictAMQP()) + { + // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. + _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue + _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && - !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return subscriber; - } + _connection.getProtocolHandler().syncWrite( + BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue + , BasicRecoverOkBody.class); + } - void deleteQueue(AMQShortString queueName) throws JMSException - { - try - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + if (!isSuspended) + { + suspendChannel(false); + } } catch (AMQException e) { - throw new JMSAMQException(e); + throw new JMSAMQException("Recover failed: " + e.getMessage(), e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); } } - /** Note, currently this does not handle reuse of the same name with different topics correctly. */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + public void rejectMessage(UnprocessedMessage message, boolean requeue) { - checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return subscriber; - } - public TopicPublisher createPublisher(Topic topic) throws JMSException - { - checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + } + + rejectMessage(message.getDeliverBody().deliveryTag, requeue); } - public QueueBrowser createBrowser(Queue queue) throws JMSException + public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - if (isStrictAMQP()) + if (_logger.isTraceEnabled()) { - throw new UnsupportedOperationException(); + _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); } - return createBrowser(queue, null); + rejectMessage(message.getDeliveryTag(), requeue); + } - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException + public void rejectMessage(long deliveryTag, boolean requeue) { - if (isStrictAMQP()) + if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) { - throw new UnsupportedOperationException(); - } + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } - checkNotClosed(); - checkValidQueue(queue); - return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); + AMQFrame basicRejectBody = + BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + requeue); + + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } - public TemporaryQueue createTemporaryQueue() throws JMSException + /** + * Commits all messages done in this transaction and releases any locks currently held. + * + * <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the + * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the rollback actually happened on the broker in this case. + * + * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does + * not mean that the rollback is known to have failed, merely that it is not known whether it + * failed or not. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void rollback() throws JMSException { - checkNotClosed(); - return new AMQTemporaryQueue(this); + synchronized (_suspensionLock) + { + checkTransacted(); + + try + { + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + suspendChannel(true); + } + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (!isSuspended) + { + suspendChannel(false); + } + } + catch (AMQException e) + { + throw new JMSAMQException("Failed to rollback: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e); + } + } } - public TemporaryTopic createTemporaryTopic() throws JMSException + public void run() { - checkNotClosed(); - return new AMQTemporaryTopic(this); + throw new java.lang.UnsupportedOperationException(); } + public void setMessageListener(MessageListener listener) throws JMSException + { + // checkNotClosed(); + // + // if (_dispatcher != null && !_dispatcher.connectionStopped()) + // { + // throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); + // } + // + // // We are stopped + // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + // { + // BasicMessageConsumer consumer = i.next(); + // + // if (consumer.isReceiving()) + // { + // throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); + // } + // } + // + // _messageListener = listener; + // + // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + // { + // i.next().setMessageListener(_messageListener); + // } + + } + + /*public void setTicket(int ticket) + { + _ticket = ticket; + }*/ + public void unsubscribe(String name) throws JMSException { checkNotClosed(); @@ -1815,7 +1494,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." - + " Requesting queue deletion regardless."); + + " Requesting queue deletion regardless."); } deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); @@ -1835,189 +1514,248 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException + protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException { - return isQueueBound(exchangeName, queueName, null); - } + checkTemporaryDestination(destination); - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException - { + final String messageSelector; - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - exchangeName, // exchange - queueName, // queue - routingKey); // routingKey - AMQMethodEvent response = null; - try + if (_strictAMQP && !((selector == null) || selector.equals(""))) { - response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); + } + else + { + messageSelector = null; + } } - catch (AMQException e) + else { - throw new JMSAMQException(e); + messageSelector = selector; } - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency - } - private void checkTransacted() throws JMSException - { - if (!getTransacted()) - { - throw new IllegalStateException("Session is not transacted"); - } - } + return new FailoverRetrySupport<MessageConsumer, JMSException>( + new FailoverProtectedOperation<MessageConsumer, JMSException>() + { + public MessageConsumer execute() throws JMSException, FailoverException + { + checkNotClosed(); - private void checkNotTransacted() throws JMSException - { - if (getTransacted()) - { - throw new IllegalStateException("Session is transacted"); - } + AMQDestination amqd = (AMQDestination) destination; + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + // TODO: Define selectors in AMQP + // TODO: construct the rawSelector from the selector string if rawSelector == null + final FieldTable ft = FieldTableFactory.newFieldTable(); + // if (rawSelector != null) + // ft.put("headers", rawSelector.getDataAsBytes()); + if (rawSelector != null) + { + ft.addAll(rawSelector); + } + + BasicMessageConsumer consumer = + new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, + _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, + exclusive, _acknowledgeMode, noConsume, autoClose); + + if (_messageListener != null) + { + consumer.setMessageListener(_messageListener); + } + + try + { + registerConsumer(consumer, false); + } + catch (AMQInvalidArgumentException ise) + { + JMSException ex = new InvalidSelectorException(ise.getMessage()); + ex.setLinkedException(ise); + throw ex; + } + catch (AMQInvalidRoutingKeyException e) + { + JMSException ide = + new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); + ide.setLinkedException(e); + throw ide; + } + catch (AMQException e) + { + JMSException ex = new JMSException("Error registering consumer: " + e); + + if (_logger.isDebugEnabled()) + { + e.printStackTrace(); + } + + ex.setLinkedException(e); + throw ex; + } + + synchronized (destination) + { + _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); + _destinationConsumerCount.get(destination).incrementAndGet(); + } + + return consumer; + } + }, _connection).execute(); } /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto - * the queue read by the dispatcher. + * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer + * instance. * - * @param message the message that has been received + * @param consumer the consum */ - public void messageReceived(UnprocessedMessage message) + void deregisterConsumer(BasicMessageConsumer consumer) { - if (_logger.isDebugEnabled()) + if (_consumers.remove(consumer.getConsumerTag()) != null) { - _logger.debug("Message[" + (message.getDeliverBody() == null ? - "B:" + message.getBounceBody() : "D:" + message.getDeliverBody()) - + "] received in session with channel id " + _channelId); - } + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if (subscriptionName != null) + { + _subscriptions.remove(subscriptionName); + } - if (message.getDeliverBody() == null) - { - // Return of the bounced message. - returnBouncedMessage(message); - } - else - { - _queue.add(message); + Destination dest = consumer.getDestination(); + synchronized (dest) + { + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } + } } } - private void returnBouncedMessage(final UnprocessedMessage message) + void deregisterProducer(long producerId) { - _connection.performConnectionTask( - new Runnable() - { - public void run() - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, - false, - message.getBounceBody().exchange, - message.getBounceBody().routingKey, - message.getContentHeader(), - message.getBodies()); - - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } + _producers.remove(new Long(producerId)); + } - } - catch (Exception e) - { - _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } - }); + boolean isInRecovery() + { + return _inRecovery; + } + + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException + { + return isQueueBound(exchangeName, queueName, null); } /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a - * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar. + * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key. * - * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery - * tag + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param exchangeName The exchange name to test for binding against. + * @param queueName The queue name to check if bound. + * @param routingKey The routing key to check if the queue is bound under. + * + * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. + * + * @throws JMSException If the query fails for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) + boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple - if (_logger.isDebugEnabled()) + try { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + AMQMethodEvent response = + new FailoverRetrySupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + AMQFrame boundFrame = + ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), + getProtocolMinorVersion(), exchangeName, // exchange + queueName, // queue + routingKey); // routingKey + + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + + } + }, _connection).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.replyCode == 0); + } + catch (AMQException e) + { + throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } - getProtocolHandler().writeFrame(ackFrame); } - public int getDefaultPrefetch() + /** + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover + * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. + */ + void markClosed() { - return _defaultPrefetchHighMark; + _closed.set(true); + _connection.deregisterSession(_channelId); + markClosedProducersAndConsumers(); + } - public int getDefaultPrefetchHigh() + /** + * Resubscribes all producers and consumers. This is called when performing failover. + * + * @throws AMQException + */ + void resubscribe() throws AMQException { - return _defaultPrefetchHighMark; + resubscribeProducers(); + resubscribeConsumers(); } - public int getDefaultPrefetchLow() + void setHasMessageListeners() { - return _defaultPrefetchLowMark; + _hasMessageListeners = true; } - public int getChannelId() + void setInRecovery(boolean inRecovery) { - return _channelId; + _inRecovery = inRecovery; } + /** + * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. + * + * @throws AMQException If the session cannot be started for any reason. + * + * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. + */ void start() throws AMQException { - //fixme This should be controlled by _stopped as it pairs with the stop method - //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. - //will result in sending Flow messages for each subsequent call to flow.. only need to do this - // if we have called stop. + // Check if the session has perviously been started and suspended, in which case it must be unsuspended. if (_startedAtLeastOnce.getAndSet(true)) { - //then we stopped this and are restarting, so signal server to resume delivery suspendChannel(false); } + // If the event dispatcher is not running then start it too. if (hasMessageListeners()) { startDistpatcherIfNecessary(); } } - private boolean hasMessageListeners() - { - return _hasMessageListeners; - } - - void setHasMessageListeners() - { - _hasMessageListeners = true; - } - synchronized void startDistpatcherIfNecessary() { // If IMMEDIATE_PREFETCH is not set then we need to start fetching @@ -2032,7 +1770,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - _logger.info("Suspending channel threw an exception:" + e); + _logger.info("Unsuspending channel threw an exception:" + e); } } } @@ -2057,7 +1795,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void stop() throws AMQException { - //stop the server delivering messages to this session + // Stop the server delivering messages to this session. suspendChannel(true); if (_dispatcher != null) @@ -2066,320 +1804,556 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - /** - * Callers must hold the failover mutex before calling this method. + /* + * Binds the named queue, with the specified routing key, to the named exchange. * - * @param consumer + * <p/>Note that this operation automatically retries in the event of fail-over. * - * @throws AMQException + * @param queueName The name of the queue to bind. + * @param routingKey The routing key to bind the queue with. + * @param arguments Additional arguments. + * @param exchangeName The exchange to bind the queue on. + * + * @throws AMQException If the queue cannot be bound for any reason. */ - void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException + /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) + throws AMQException, FailoverException { - AMQDestination amqd = consumer.getDestination(); + AMQFrame queueBind = + QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments + amqd.getExchangeName(), // exchange + false, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + getTicket()); // ticket - AMQProtocolHandler protocolHandler = getProtocolHandler(); - - declareExchange(amqd, protocolHandler, false); - - AMQShortString queueName = declareQueue(amqd, protocolHandler); - - bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - - // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch - if (!_immediatePrefetch) - { - // The dispatcher will be null if we have just created this session - // so suspend the channel before we register our consumer so that we don't - // start prefetching until a receive/mListener is set. - if (_dispatcher == null) - { - if (!isSuspended()) - { - try - { - suspendChannel(true); - _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); - } - } - } - } - else - { - _logger.info("Immediately prefetching existing messages to new consumer."); - } + protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + }*/ - try - { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); - } - catch (JMSException e) //thrown by getMessageSelector + private void checkNotTransacted() throws JMSException + { + if (getTransacted()) { - throw new AMQException(e.getMessage(), e); + throw new IllegalStateException("Session is transacted"); } } - /** - * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer - * instance. - * - * @param consumer the consum - */ - void deregisterConsumer(BasicMessageConsumer consumer) + private void checkTemporaryDestination(Destination destination) throws JMSException { - if (_consumers.remove(consumer.getConsumerTag()) != null) + if ((destination instanceof TemporaryDestination)) { - String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if (subscriptionName != null) + _logger.debug("destination is temporary"); + final TemporaryDestination tempDest = (TemporaryDestination) destination; + if (tempDest.getSession() != this) { - _subscriptions.remove(subscriptionName); + _logger.debug("destination is on different session"); + throw new JMSException("Cannot consume from a temporary destination created onanother session"); } - Destination dest = consumer.getDestination(); - synchronized (dest) + if (tempDest.isDeleted()) { - if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) - { - _destinationConsumerCount.remove(dest); - } + _logger.debug("destination is deleted"); + throw new JMSException("Cannot consume from a deleted destination"); } } } - private void registerProducer(long producerId, MessageProducer producer) + private void checkTransacted() throws JMSException { - _producers.put(new Long(producerId), producer); + if (!getTransacted()) + { + throw new IllegalStateException("Session is not transacted"); + } } - void deregisterProducer(long producerId) + private void checkValidDestination(Destination destination) throws InvalidDestinationException { - _producers.remove(new Long(producerId)); + if (destination == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - private long getNextProducerId() + private void checkValidQueue(Queue queue) throws InvalidDestinationException { - return ++_nextProducerId; + if (queue == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - /** - * Resubscribes all producers and consumers. This is called when performing failover. - * - * @throws AMQException + /* + * I could have combined the last 3 methods, but this way it improves readability */ - void resubscribe() throws AMQException + private AMQTopic checkValidTopic(Topic topic) throws JMSException { - resubscribeProducers(); - resubscribeConsumers(); + if (topic == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } + + if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this)) + { + throw new javax.jms.InvalidDestinationException( + "Cannot create a subscription on a temporary topic created in another session"); + } + + if (!(topic instanceof AMQTopic)) + { + throw new javax.jms.InvalidDestinationException( + "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + + topic.getClass().getName()); + } + + return (AMQTopic) topic; } - private void resubscribeProducers() throws AMQException + /** + * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. + * + * @param error not null if this is a result of an error occurring at the connection level + */ + private void closeConsumers(Throwable error) throws JMSException { - ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey - for (Iterator it = producers.iterator(); it.hasNext();) + if (_dispatcher != null) { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); - producer.resubscribe(); + _dispatcher.close(); + _dispatcher = null; + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + + final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + while (it.hasNext()) + { + final BasicMessageConsumer con = it.next(); + if (error != null) + { + con.notifyError(error); + } + else + { + con.close(); + } } + // at this point the _consumers map will be empty } - private void resubscribeConsumers() throws AMQException + /** + * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is + * currently no way of propagating errors to message producers (this is a JMS limitation). + */ + private void closeProducers() throws JMSException { - ArrayList consumers = new ArrayList(_consumers.values()); - _consumers.clear(); + // we need to clone the list of producers since the close() method updates the _producers collection + // which would result in a concurrent modification exception + final ArrayList clonedProducers = new ArrayList(_producers.values()); - for (Iterator it = consumers.iterator(); it.hasNext();) + final Iterator it = clonedProducers.iterator(); + while (it.hasNext()) { - BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); - registerConsumer(consumer, true); + final BasicMessageProducer prod = (BasicMessageProducer) it.next(); + prod.close(); } + // at this point the _producers map is empty } - private void suspendChannel(boolean suspend) throws AMQException + /** + * Close all producers or consumers. This is called either in the error case or when closing the session normally. + * + * @param amqe the exception, may be null to indicate no error has occurred + */ + private void closeProducersAndConsumers(AMQException amqe) throws JMSException { - synchronized (_suspensionLock) + JMSException jmse = null; + try { - if (_logger.isDebugEnabled()) + closeProducers(); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + jmse = e; + } + + try + { + closeConsumers(amqe); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + if (jmse == null) { - _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + jmse = e; } + } - _suspended = suspend; - - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - !suspend); // active - - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + if (jmse != null) + { + throw jmse; } } - - public void confirmConsumerCancelled(AMQShortString consumerTag) + /** + * Register to consume from the queue. + * + * @param queueName + */ + private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { + // need to generate a consumer tag on the client so we can exploit the nowait flag + AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); - // Remove the consumer from the map - BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if (consumer != null) + FieldTable arguments = FieldTableFactory.newFieldTable(); + if ((messageSelector != null) && !messageSelector.equals("")) { -// fixme this isn't right.. needs to check if _queue contains data for this consumer - if (consumer.isAutoClose())// && _queue.isEmpty()) - { - consumer.closeWhenNoMessages(true); - } + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); + } - if (!consumer.isNoConsume()) - { - //Clean the Maps up first - //Flush any pending messages for this consumerTag - if (_dispatcher != null) - { - _logger.info("Dispatcher is not null"); - } - else - { - _logger.info("Dispatcher is null so created stopped dispatcher"); + if (consumer.isAutoClose()) + { + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } - startDistpatcherIfNecessary(true); - } + if (consumer.isNoConsume()) + { + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } - _dispatcher.rejectPending(consumer); + consumer.setConsumerTag(tag); + // we must register the consumer in the map before we actually start listening + _consumers.put(tag, consumer); + + try + { + // TODO: Be aware of possible changes to parameter order as versions change. + AMQFrame jmsConsume = + BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + getTicket()); // ticket + + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); } else { - //Just close the consumer - //fixme the CancelOK is being processed before the arriving messages.. - // The dispatcher is still to process them so the server sent in order but the client - // has yet to receive before the close comes in. - -// consumer.markClosed(); + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); } } - else + catch (AMQException e) { - _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); + // clean-up the map in the event of an error + _consumers.remove(tag); + throw e; } + } + private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + throws JMSException + { + return createProducerImpl(destination, mandatory, immediate, false); + } + private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent) throws JMSException + { + return new FailoverRetrySupport<BasicMessageProducer, JMSException>( + new FailoverProtectedOperation<BasicMessageProducer, JMSException>() + { + public BasicMessageProducer execute() throws JMSException, FailoverException + { + checkNotClosed(); + long producerId = getNextProducerId(); + BasicMessageProducer producer = + new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, + AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + registerProducer(producerId, producer); + + return producer; + } + }, _connection).execute(); } - /* - * I could have combined the last 3 methods, but this way it improves readability - */ - private AMQTopic checkValidTopic(Topic topic) throws JMSException + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { - if (topic == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Topic"); - } - if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) - { - throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); - } - if (!(topic instanceof AMQTopic)) - { - throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); - } - return (AMQTopic) topic; + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - private void checkValidQueue(Queue queue) throws InvalidDestinationException + /** + * Declares the named exchange and type of exchange. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the exchange to declare. + * @param type The type of the exchange to declare. + * @param protocolHandler The protocol handler to process the communication through. + * @param nowait + * + * @throws AMQException If the exchange cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void declareExchange(final AMQShortString name, final AMQShortString type, + final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException { - if (queue == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + nowait, // nowait + false, // passive + getTicket(), // ticket + type); // type + + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + + return null; + } + }, _connection).execute(); } - private void checkValidDestination(Destination destination) throws InvalidDestinationException + /** + * Declares a queue for a JMS destination. + * + * <p/>Note that for queues but not topics the name is generated in the client rather than the server. This allows + * the name to be reused on failover if required. In general, the destination indicates whether it wants a name + * generated or not. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param amqd The destination to declare as a queue. + * @param protocolHandler The protocol handler to communicate through. + * + * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of + * the client. + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Verify the destiation is valid or throw an exception. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + throws AMQException { - if (destination == null) + /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + return new FailoverNoopSupport<AMQShortString, AMQException>( + new FailoverProtectedOperation<AMQShortString, AMQException>() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + amqd.setQueueName(protocolHandler.generateQueueName()); + } + + AMQFrame queueDeclare = + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + false, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + getTicket()); // ticket + + protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return amqd.getAMQQueueName(); + } + }, _connection).execute(); + } + + /** + * Undeclares the specified queue. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param queueName The name of the queue to delete. + * + * @throws JMSException If the queue could not be deleted for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void deleteQueue(final AMQShortString queueName) throws JMSException + { + try { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); + new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame queueDeleteFrame = + QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + + return null; + } + }, _connection).execute(); + } + catch (AMQException e) + { + throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), e); } } - - public AMQShortString getTemporaryTopicExchangeName() + private long getNextProducerId() { - return _connection.getTemporaryTopicExchangeName(); + return ++_nextProducerId; } - public AMQShortString getTemporaryQueueExchangeName() + private AMQProtocolHandler getProtocolHandler() { - return _connection.getTemporaryQueueExchangeName(); + return _connection.getProtocolHandler(); } - - public int getTicket() + private byte getProtocolMajorVersion() { - return _ticket; + return getProtocolHandler().getProtocolMajorVersion(); } - public void setTicket(int ticket) + private byte getProtocolMinorVersion() { - _ticket = ticket; + return getProtocolHandler().getProtocolMinorVersion(); } + private boolean hasMessageListeners() + { + return _hasMessageListeners; + } - public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException + private void markClosedConsumers() throws JMSException { - getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), - getProtocolMinorVersion(), - active, - exclusive, - passive, - read, - realm, - write), - new BlockingMethodFrameListener(_channelId) - { - - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException - { - if (frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody) frame).getTicket()); - return true; - } - else - { - return false; - } - } - }); + if (_dispatcher != null) + { + _dispatcher.close(); + _dispatcher = null; + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + while (it.hasNext()) + { + final BasicMessageConsumer con = it.next(); + con.markClosed(); + } + // at this point the _consumers map will be empty } - private class SuspenderRunner implements Runnable + private void markClosedProducersAndConsumers() { - private boolean _suspend; + try + { + // no need for a markClosed* method in this case since there is no protocol traffic closing a producer + closeProducers(); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + } - public SuspenderRunner(boolean suspend) + try { - _suspend = suspend; + markClosedConsumers(); } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + } + } - public void run() + /** + * Callers must hold the failover mutex before calling this method. + * + * @param consumer + * + * @throws AMQException + */ + private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException + { + AMQDestination amqd = consumer.getDestination(); + + AMQProtocolHandler protocolHandler = getProtocolHandler(); + + declareExchange(amqd, protocolHandler, false); + + AMQShortString queueName = declareQueue(amqd, protocolHandler); + + // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName()); + + // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch + if (!_immediatePrefetch) { - try - { - suspendChannel(_suspend); - } - catch (AMQException e) + // The dispatcher will be null if we have just created this session + // so suspend the channel before we register our consumer so that we don't + // start prefetching until a receive/mListener is set. + if (_dispatcher == null) { - _logger.warn("Unable to suspend channel"); + if (!isSuspended()) + { + try + { + suspendChannel(true); + _logger.info( + "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } + } } } + else + { + _logger.info("Immediately prefetching existing messages to new consumer."); + } + + try + { + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); + } + catch (JMSException e) // thrown by getMessageSelector + { + throw new AMQException(e.getMessage(), e); + } + catch (FailoverException e) + { + throw new AMQException("Fail-over exception interrupted basic consume.", e); + } } + private void registerProducer(long producerId, MessageProducer producer) + { + _producers.put(new Long(producerId), producer); + } private void rejectAllMessages(boolean requeue) { @@ -2396,8 +2370,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi Iterator messages = _queue.iterator(); if (_logger.isInfoEnabled()) { - _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + - ") (PDispatchQ) requeue:" + requeue); + _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + + requeue); if (messages.hasNext()) { @@ -2412,12 +2386,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) + if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag)) { if (_logger.isDebugEnabled()) { - _logger.debug("Removing message(" + System.identityHashCode(message) + - ") from _queue DT:" + message.getDeliverBody().deliveryTag); + _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" + + message.getDeliverBody().deliveryTag); } messages.remove(); @@ -2432,50 +2406,361 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - - public void rejectMessage(UnprocessedMessage message, boolean requeue) + private void resubscribeConsumers() throws AMQException { + ArrayList consumers = new ArrayList(_consumers.values()); + _consumers.clear(); - if (_logger.isTraceEnabled()) + for (Iterator it = consumers.iterator(); it.hasNext();) { - _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + registerConsumer(consumer, true); } - - rejectMessage(message.getDeliverBody().deliveryTag, requeue); } - public void rejectMessage(AbstractJMSMessage message, boolean requeue) + private void resubscribeProducers() throws AMQException { - if (_logger.isTraceEnabled()) + ArrayList producers = new ArrayList(_producers.values()); + _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey + for (Iterator it = producers.iterator(); it.hasNext();) { - _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); + BasicMessageProducer producer = (BasicMessageProducer) it.next(); + producer.resubscribe(); } - rejectMessage(message.getDeliveryTag(), requeue); + } + + private void returnBouncedMessage(final UnprocessedMessage message) + { + _connection.performConnectionTask(new Runnable() + { + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange, + message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); + + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); + AMQShortString reason = message.getBounceBody().replyText; + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } + } + }); } - public void rejectMessage(long deliveryTag, boolean requeue) + /** + * Suspends or unsuspends this session. + * + * @param suspend <tt>true</tt> indicates that the session should be suspended, <tt>false<tt> indicates that it + * should be unsuspended. + * + * @throws AMQException If the session cannot be suspended for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void suspendChannel(boolean suspend) throws AMQException // , FailoverException { - if (_acknowledgeMode == CLIENT_ACKNOWLEDGE || - _acknowledgeMode == SESSION_TRANSACTED) + synchronized (_suspensionLock) { - if (_logger.isDebugEnabled()) + try { - _logger.debug("Rejecting delivery tag:" + deliveryTag); - } - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } - _connection.getProtocolHandler().writeFrame(basicRejectBody); + _suspended = suspend; + + AMQFrame channelFlowFrame = + ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + !suspend); + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + } + catch (FailoverException e) + { + throw new AMQException("Fail-over interrupted suspend/unsuspend channel.", e); + } } } - public boolean isStrictAMQP() + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private class Dispatcher extends Thread { - return _strictAMQP; + + /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + + public Dispatcher() + { + super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } + } + + public void close() + { + _closed.set(true); + interrupt(); + + // fixme awaitTermination + + } + + public void rejectPending(BasicMessageConsumer consumer) + { + synchronized (_lock) + { + boolean stopped = _dispatcher.connectionStopped(); + + if (!stopped) + { + _dispatcher.setConnectionStopped(true); + } + + // Reject messages on pre-receive queue + consumer.rollback(); + + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + + // closeConsumer + consumer.markClosed(); + + _dispatcher.setConnectionStopped(stopped); + + } + } + + public void rollback() + { + + synchronized (_lock) + { + boolean isStopped = connectionStopped(); + + if (!isStopped) + { + setConnectionStopped(true); + } + + rejectAllMessages(true); + + _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); + + for (BasicMessageConsumer consumer : _consumers.values()) + { + if (!consumer.isNoConsume()) + { + consumer.rollback(); + } + else + { + // should perhaps clear the _SQ here. + // consumer._synchronousQueue.clear(); + consumer.clearReceiveQueue(); + } + + } + + setConnectionStopped(isStopped); + } + + } + + public void run() + { + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " started"); + } + + UnprocessedMessage message; + + // Allow disptacher to start stopped + synchronized (_lock) + { + while (connectionStopped()) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + // ignore + } + } + } + + try + { + while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) + { + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } + + dispatchMessage(message); + + while (connectionStopped()) + { + _lock.wait(); + } + + } + + } + } + catch (InterruptedException e) + { + // ignore + } + + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); + } + } + + // only call while holding lock + final boolean connectionStopped() + { + return _connectionStopped; + } + + boolean setConnectionStopped(boolean connectionStopped) + { + boolean currently; + synchronized (_lock) + { + currently = _connectionStopped; + _connectionStopped = connectionStopped; + _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) + { + _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + + ": Currently " + (currently ? "Stopped" : "Started")); + } + } + + return currently; + } + + private void dispatchMessage(UnprocessedMessage message) + { + if (message.getDeliverBody() != null) + { + final BasicMessageConsumer consumer = + (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + + if ((consumer == null) || consumer.isClosed()) + { + if (_dispatcherLogger.isInfoEnabled()) + { + if (consumer == null) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); + } + else + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); + } + } + // Don't reject if we're already closing + if (!_closed.get()) + { + rejectMessage(message, true); + } + } + else + { + consumer.notifyMessage(message, _channelId); + } + } + } } + /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, + boolean read) throws AMQException + { + getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), + getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write), + new BlockingMethodFrameListener(_channelId) + { + + public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException + { + if (frame instanceof AccessRequestOkBody) + { + setTicket(((AccessRequestOkBody) frame).getTicket()); + + return true; + } + else + { + return false; + } + } + }); + }*/ + + private class SuspenderRunner implements Runnable + { + private boolean _suspend; + + public SuspenderRunner(boolean suspend) + { + _suspend = suspend; + } + + public void run() + { + try + { + suspendChannel(_suspend); + } + catch (AMQException e) + { + _logger.warn("Unable to suspend channel"); + } + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1c3cdbcb65..3a31eda754 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,22 +20,10 @@ */ package org.apache.qpid.client; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; @@ -48,6 +36,19 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + public class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); @@ -140,9 +141,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -219,7 +220,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " - + _destination); + + _destination); } } else @@ -468,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " close():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -481,9 +482,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = - BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag - false); // nowait + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -497,10 +498,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (AMQException e) { - // _logger.error("Error closing consumer: " + e, e); - JMSException jmse = new JMSException("Error closing consumer: " + e); - jmse.setLinkedException(e); - throw jmse; + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } else @@ -540,7 +542,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " markClosed():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -572,9 +574,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { @@ -659,15 +661,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } @@ -677,55 +679,55 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } - if (_dups_ok_acknowledge_send) + if (_dups_ok_acknowledge_send) + { + if (!_session.isInRecovery()) { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - } + _session.acknowledgeMessage(msg.getDeliveryTag(), true); } + } - break; + break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } - break; + break; } } @@ -757,7 +759,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " notifyError():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -817,7 +819,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() throws JMSException + public void acknowledge() // throws JMSException { if (!isClosed()) { @@ -877,7 +879,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -907,7 +909,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); @@ -931,7 +933,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer else { _logger.error("Queue contained a :" + o.getClass() - + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java index d1237cff49..0927ca3625 100644 --- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -18,29 +18,12 @@ * under the License. * */ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ package org.apache.qpid.client; -import javax.jms.JMSException; - import org.apache.qpid.AMQException; +import javax.jms.JMSException; + /** * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions @@ -50,8 +33,6 @@ import org.apache.qpid.AMQException; * <tr><th> Responsibilities <th> Collaborations * <tr><td> Accept wrapped exceptions as a JMSException. * </table> - * - * @author Apache Software Foundation */ public class JMSAMQException extends JMSException { @@ -71,6 +52,11 @@ public class JMSAMQException extends JMSException } } + /** + * @param s The underlying exception. + * + * @deprecated Use the other constructor and write a helpfull message. This one will be deleted. + */ public JMSAMQException(AMQException s) { super(s.getMessage(), String.valueOf(s.getErrorCode())); diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java index 49377fdc19..037b0dc2d1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,10 +21,26 @@ package org.apache.qpid.client.failover; /** - * This exception is thrown when failover is taking place and we need to let other - * parts of the client know about this. + * FailoverException is used to indicate that a synchronous request has failed to receive the reply that it is waiting + * for because the fail-over process has been started whilst it was waiting for its reply. Synchronous methods generally + * raise this exception to indicate that they must be re-tried once the fail-over process has completed. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Used to indicate failure of a synchronous request due to fail-over. + * </table> + * + * @todo This exception is created and passed as an argument to a method, rather than thrown. The exception is being + * used to represent an event, passed out to other threads. Use of exceptions as arguments rather than as + * exceptions is extremly confusing. Ideally use a condition or set a flag and check it instead. + * This exceptions-as-events pattern seems to be in a similar style to Mina code, which is not pretty, but + * potentially acceptable for that reason. We have the option of extending the mina model to add more events + * to it, that is, anything that is interested in handling failover as an event occurs below the main + * amq event handler, which knows the specific interface of the qpid handlers, which can pass this down as + * an explicit event, without it being an exception. Add failover method to BlockingMethodFrameListener, + * have it set a flag or interrupt the waiting thread, which then creates and raises this exception. */ -public class FailoverException extends RuntimeException +public class FailoverException extends Exception { public FailoverException(String message) { diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 844ecbe743..dbbceff523 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,59 +20,108 @@ */ package org.apache.qpid.client.failover; -import java.util.concurrent.CountDownLatch; - import org.apache.log4j.Logger; + import org.apache.mina.common.IoSession; + import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; +import java.util.concurrent.CountDownLatch; + /** - * When failover is required, we need a separate thread to handle the establishment of the new connection and - * the transfer of subscriptions. - * </p> - * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor - * thread. One significant task is the connection setup which involves a protocol exchange until a particular state - * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means - * the IO processor is not able to do anything at all. + * FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the + * class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport + * connections, failing over to a new connection if the transport connection fails. The procedure to establish a new + * connection is expressed as a continuation, in order that it may be run in a seperate thread to the i/o thread that + * detected the failure and is used to handle the communication to establish a new connection. + * + * </p>The reason this needs to be a separate thread is because this work cannot be done inside the i/o processor + * thread. The significant task is the connection setup which involves a protocol exchange until a particular state + * is achieved. This procedure waits until the state is achieved which would prevent the i/o thread doing the work + * it needs to do to achieve the new state. + * + * <p/>The failover procedure does the following: + * + * <ol> + * <li>Sets the failing over condition to true.</li> + * <li>Creates a {@link FailoverException} and gets the protocol connection handler to propagate this event to all + * interested parties.</li> + * <li>Takes the failover mutex on the protocol connection handler.</li> + * <li>Abandons the fail over if any of the interested parties vetoes it. The mutex is released and the condition + * reset.</li> + * <li>Creates a new {@link AMQStateManager} and re-established the connection through it.</li> + * <li>Informs the AMQConnection if the connection cannot be re-established.</li> + * <li>Recreates all sessions from the old connection to the new.</li> + * <li>Resets the failing over condition and releases the mutex.</li> + * </ol> + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Update fail-over state <td> {@link AMQProtocolHandler} + * </table> + * + * @todo The failover latch and mutex are used like a lock and condition. If the retrotranlator supports lock/condition + * then could change over to using them. 1.4 support still needed. + * + * @todo If the condition is set to null on a vetoes fail-over and there are already other threads waiting on the + * condition, they will never be released. It might be an idea to reset the condition in a finally block. + * + * @todo Creates a {@link AMQDisconnectedException} and passes it to the AMQConnection. No need to use an + * exception-as-argument here, could just as easily call a specific method for this purpose on AMQConnection. + * + * @todo Creates a {@link FailoverException} and propagates it to the MethodHandlers. No need to use an + * exception-as-argument here, could just as easily call a specific method for this purpose on + * {@link org.apache.qpid.protocol.AMQMethodListener}. */ public class FailoverHandler implements Runnable { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(FailoverHandler.class); + /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */ private final IoSession _session; + + /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */ private AMQProtocolHandler _amqProtocolHandler; - /** - * Used where forcing the failover host - */ + /** Used to hold the host to fail over to. This is optional and if not set a reconnect to the previous host is tried. */ private String _host; - /** - * Used where forcing the failover port - */ + /** Used to hold the port to fail over to. */ private int _port; + /** + * Creates a failover handler on a protocol session, for a particular MINA session (network connection). + * + * @param amqProtocolHandler The protocol handler that spans the failover. + * @param session The MINA session, for the failing connection. + */ public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) { _amqProtocolHandler = amqProtocolHandler; _session = session; } + /** + * Performs the failover procedure. See the class level comment, {@link FailoverHandler}, for a description of the + * failover procedure. + */ public void run() { if (Thread.currentThread().isDaemon()) { throw new IllegalStateException("FailoverHandler must run on a non-daemon thread."); } - //Thread.currentThread().setName("Failover Thread"); + // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of + // the fail over. _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1)); // We wake up listeners. If they can handle failover, they will extend the - // FailoverSupport class and will in turn block on the latch until failover - // has completed before retrying the operation + // FailoverRetrySupport class and will in turn block on the latch until failover + // has completed before retrying the operation. _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start")); // Since failover impacts several structures we protect them all with a single mutex. These structures @@ -93,14 +142,18 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(existingStateManager); if (_host != null) { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Redirect was vetoed by client")); } else { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Failover was vetoed by client")); } + _amqProtocolHandler.getFailoverLatch().countDown(); _amqProtocolHandler.setFailoverLatch(null); + return; } @@ -119,12 +172,12 @@ public class FailoverHandler implements Runnable { failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(); } + if (!failoverSucceeded) { _amqProtocolHandler.setStateManager(existingStateManager); - _amqProtocolHandler.getConnection().exceptionReceived( - new AMQDisconnectedException("Server closed connection and no failover " + - "was successful")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Server closed connection and no failover " + "was successful")); } else { @@ -140,6 +193,7 @@ public class FailoverHandler implements Runnable { _logger.info("Client vetoed automatic resubscription"); } + _amqProtocolHandler.getConnection().fireFailoverComplete(); _amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED); _logger.info("Connection failover completed successfully"); @@ -148,35 +202,36 @@ public class FailoverHandler implements Runnable { _logger.info("Failover process failed - exception being propagated by protocol handler"); _amqProtocolHandler.setFailoverState(FailoverState.FAILED); - try - { - _amqProtocolHandler.exceptionCaught(_session, e); - } + /*try + {*/ + _amqProtocolHandler.exceptionCaught(_session, e); + /*} catch (Exception ex) { _logger.error("Error notifying protocol session of error: " + ex, ex); - } + }*/ } } } - _amqProtocolHandler.getFailoverLatch().countDown(); - } - public String getHost() - { - return _host; + _amqProtocolHandler.getFailoverLatch().countDown(); } + /** + * Sets the host name to fail over to. This is optional and if not set a reconnect to the previous host is tried. + * + * @param host The host name to fail over to. + */ public void setHost(String host) { _host = host; } - public int getPort() - { - return _port; - } - + /** + * Sets the port to fail over to. + * + * @param port The port to fail over to. + */ public void setPort(int port) { _port = port; diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java new file mode 100644 index 0000000000..dece1b6c3f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -0,0 +1,54 @@ +package org.apache.qpid.client.failover;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support
+ * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this
+ * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be
+ * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception,
+ * for example, because the caller already holds locks preventing that condition from arising.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Perform a fail-over protected operation with no handling of fail-over conditions.
+ * </table>
+ */
+public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverNoopSupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delegates to another continuation which is to be provided with fail-over handling.
+ *
+ * @return The return value from the delegated to continuation.
+ * @throws E Any exception that the delegated to continuation may raise.
+ */
+ public T execute() throws E
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ throw new IllegalStateException("Fail-over interupted no-op failover support. "
+ + "No-op support should only be used where the caller is certaing fail-over cannot occur.", e);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java new file mode 100644 index 0000000000..efb7bf8aed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java @@ -0,0 +1,30 @@ +package org.apache.qpid.client.failover;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because
+ * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers
+ * for failover protected operations, in order to provide different handling schemes when failovers occurr.
+ *
+ * <p/>The type of checked exception that the operation may perform has been generified, in order that fail over
+ * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not
+ * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes
+ * will mask the exception.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Perform an operation that may be interrupted by fail-over.
+ * </table>
+ */
+public interface FailoverProtectedOperation<T, E extends Exception>
+{
+ /**
+ * Performs the continuations work.
+ *
+ * @return Provdes scope for the continuation to return an arbitrary value.
+ *
+ * @throws FailoverException If the operation is interrupted by a fail-over notification.
+ */
+ public abstract T execute() throws E, FailoverException;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java new file mode 100644 index 0000000000..1e4908976b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -0,0 +1,130 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+
+import javax.jms.JMSException;
+
+/**
+ * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified
+ * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due
+ * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request,
+ * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This
+ * automatic retrying is continued until the continuation succeeds, or throws an exception (different to
+ * FailoverException, which is used to signal the failure of the original condition).
+ *
+ * <p/>The blocking condition used is that the connection is not currently failing over, and the mutex used is the
+ * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods.
+ * These are used like a lock and condition variable.
+ *
+ * <p/>The wrapped operation may throw a FailoverException, this is an exception that can be raised by a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a
+ * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are
+ * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous
+ * methods that should not be attempted when a fail-over is in progress.
+ *
+ * <p/>Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want
+ * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation
+ * until it succeeds. Synchronous methods are usually coordinated with a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants
+ * to start and throws a FailoverException in response to this.
+ *
+ * <p/>Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a continuation synchronized on a fail-over lock and condition.
+ * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
+ * </table>
+ *
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
+ * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
+ * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
+ * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
+ *
+ * @todo InterruptedException not handled well.
+ */
+public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** Used for debugging. */
+ private static final Logger _log = Logger.getLogger(FailoverRetrySupport.class);
+
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverRetrySupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats
+ * until the operation throws AMQException or succeeds without being interrupted by fail-over.
+ *
+ * @return The result of executing the continuation.
+ *
+ * @throws AMQException Any underlying exception is allowed to fall through.
+ */
+ public T execute() throws E
+ {
+ while (true)
+ {
+ try
+ {
+ connection.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted: " + e, e);
+
+ return null;
+ }
+
+ synchronized (connection.getFailoverMutex())
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ _log.debug("Failover exception caught during operation: " + e, e);
+ }
+ }
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java index a005bc5fdf..41bac34a34 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java @@ -1,65 +1,28 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.client.failover; -import javax.jms.JMSException; - -import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -public abstract class FailoverSupport +/** + * FailoverSupport defines an interface for different types of fail-over handlers, that provide different types of + * behaviour for handling fail-overs during operations that can be interrupted by the fail-over process. For example, + * the support could automatically retry once the fail-over process completes, could prevent an operation from being + * started whilst fail-over is running, or could quietly abandon the operation or raise an exception, and so on. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities + * <tr><td> Perform a fail-over protected operation with handling for fail-over conditions. + * </table> + * + * @todo Continuation, extend some sort of re-usable Continuation interface, which might look very like this one. + */ +public interface FailoverSupport<T, E extends Exception> { - private static final Logger _log = Logger.getLogger(FailoverSupport.class); - - public Object execute(AMQConnection con) throws JMSException - { - // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding. - // Any method that can potentially block for any reason should use this class so that deadlock will not - // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners) - // that might be causing a block. When that happens, the exception is caught here and the mutex is released - // before waiting for the failover to complete (either successfully or unsuccessfully). - while (true) - { - try - { - con.blockUntilNotFailingOver(); - } - catch (InterruptedException e) - { - _log.info("Interrupted: " + e, e); - return null; - } - synchronized (con.getFailoverMutex()) - { - try - { - return operation(); - } - catch (FailoverException e) - { - _log.info("Failover exception caught during operation: " + e, e); - } - } - } - } - - protected abstract Object operation() throws JMSException; + /** + * Delegates to another continuation which is to be provided with fail-over handling. + * + * @return The return value from the delegated to continuation. + * + * @throws E Any exception that the delegated to continuation may raise. + */ + public T execute() throws E; } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index addef94215..5bf7bffc63 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,24 +20,22 @@ */ package org.apache.qpid.client.protocol; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; + import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.SSLConfiguration; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; @@ -60,22 +58,86 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +/** + * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the + * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the + * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the + * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, + * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in + * terms of "message received" and so on. + * + * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is + * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public + * API calls through which an individual connection can be manipulated. This protocol handler talks to the network + * through MINA, in a behind the scenes role; it is not an exposed part of the client API. + * + * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level, + * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in + * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol + * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions + * in the event of failover. See below for more information about this. + * + * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named + * attributes. A more convenient, type-safe, container for session data is provided in the form of + * {@link AMQProtocolSession}. + * + * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session + * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper + * as described above). This event handler is different, because dealing with failover complicates things. To the + * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but + * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot + * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old + * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection + * and the protocol session data is held outside of the MINA IOSession. + * + * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. + * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from + * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, + * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create the filter chain to filter this handlers events. + * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. + * + * <tr><td> Maintain fail-over state. + * <tr><td> + * </table> + * + * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? + * + * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. + */ public class AMQProtocolHandler extends IoHandlerAdapter { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances - * and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection + * instances and protocol handler instances. */ private AMQConnection _connection; /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; + /** Holds the state of the protocol session. */ private AMQStateManager _stateManager = new AMQStateManager(); + /** Holds the method listeners, */ private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); /** @@ -91,15 +153,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ private FailoverState _failoverState = FailoverState.NOT_STARTED; + /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; + /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + /** + * Creates a new protocol handler, associated with the specified client connection instance. + * + * @param con The client connection that this is the event handler for. + */ public AMQProtocolHandler(AMQConnection con) { _connection = con; } + /** + * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. + * + * @param session The MINA session. + * + * @throws Exception Any underlying exceptions are allowed to fall through to MINA. + */ public void sessionCreated(IoSession session) throws Exception { _logger.debug("Protocol session created for session " + System.identityHashCode(session)); @@ -119,16 +197,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_connection.getSSLConfiguration() != null) { SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLContextFactory sslFactory = + new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); } - try { - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); threadModel.getAsynchronousReadFilter().createNewJobForSession(session); threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); @@ -142,35 +219,38 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.init(); } - public void sessionOpened(IoSession session) throws Exception - { - //System.setProperty("foo", "bar"); - } - /** - * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by - * sessionClosed() depending on whether we were trying to send data at the time of failure. + * Called when the network connection is closed. This can happen, either because the client explicitly requested + * that the connection be closed, in which case nothing is done, or because the connection died. In the case + * where the connection died, an attempt to failover automatically to a new connection may be started. The failover + * process will be started, provided that it is the clients policy to allow failover, and provided that a failover + * has not already been started or failed. + * + * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught} + * may be called first followed by this method. This depends on whether the client was trying to send data at the + * time of the failure. * - * @param session + * @param session The MINA session. * - * @throws Exception + * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and + * not otherwise? The above comment doesn't make that clear. */ - public void sessionClosed(IoSession session) throws Exception + public void sessionClosed(IoSession session) { if (_connection.isClosed()) { - _logger.info("Session closed called by client"); + _logger.debug("Session closed called by client"); } else { - _logger.info("Session closed called with failover state currently " + _failoverState); + _logger.debug("Session closed called with failover state currently " + _failoverState); - //reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions // known through the policy settings. if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) { - _logger.info("FAILOVER STARTING"); + _logger.debug("FAILOVER STARTING"); if (_failoverState == FailoverState.NOT_STARTED) { _failoverState = FailoverState.IN_PROGRESS; @@ -178,12 +258,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter } else { - _logger.info("Not starting failover as state currently " + _failoverState); + _logger.debug("Not starting failover as state currently " + _failoverState); } } else { - _logger.info("Failover not allowed by policy."); + _logger.debug("Failover not allowed by policy."); // or already in progress? if (_logger.isDebugEnabled()) { @@ -192,19 +272,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_failoverState != FailoverState.IN_PROGRESS) { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.")); + _logger.debug("sessionClose() not allowed to failover"); + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection " + "not permitted.")); } else { - _logger.info("sessionClose() failover in progress"); + _logger.debug("sessionClose() failover in progress"); } } } - _logger.info("Protocol Session [" + this + "] closed"); + _logger.debug("Protocol Session [" + this + "] closed"); } /** See {@link FailoverHandler} to see rationale for separate thread. */ @@ -223,25 +302,32 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); if (IdleStatus.WRITER_IDLE.equals(status)) { - //write heartbeat frame: + // write heartbeat frame: _logger.debug("Sent heartbeat"); session.write(HeartbeatBody.FRAME); HeartbeatDiagnostics.sent(); } else if (IdleStatus.READER_IDLE.equals(status)) { - //failover: + // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); session.close(); } } - public void exceptionCaught(IoSession session, Throwable cause) throws Exception + /** + * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an + * IOException, MINA will close the connection automatically. + * + * @param session The MINA session. + * @param cause The exception that triggered this event. + */ + public void exceptionCaught(IoSession session, Throwable cause) { if (_failoverState == FailoverState.NOT_STARTED) { - //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) + // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if (cause instanceof AMQConnectionClosedException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); @@ -250,8 +336,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter sessionClosed(session); } - //FIXME Need to correctly handle other exceptions. Things like ... -// if (cause instanceof AMQChannelClosedException) + // FIXME Need to correctly handle other exceptions. Things like ... + // if (cause instanceof AMQChannelClosedException) // which will cause the JMSSession to end due to a channel close and so that Session needs // to be removed from the map so we can correctly still call close without an exception when trying to close // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception @@ -261,6 +347,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter else if (_failoverState == FailoverState.FAILED) { _logger.error("Exception caught by protocol handler: " + cause, cause); + // we notify the state manager of the error in case we have any clients waiting on a state // change. Those "waiters" will be interrupted and can handle the exception AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); @@ -297,7 +384,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter final boolean debug = _logger.isDebugEnabled(); final long msgNumber = ++_messageReceivedCount; - if (debug && (msgNumber % 1000 == 0)) + if (debug && ((msgNumber % 1000) == 0)) { _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } @@ -310,72 +397,77 @@ public class AMQProtocolHandler extends IoHandlerAdapter switch (bodyFrame.getFrameType()) { - case AMQMethodBody.TYPE: + case AMQMethodBody.TYPE: - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - try - { + try + { - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } - catch (AMQException e) + + if (!wasAnyoneInterested) { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } + } + catch (AMQException e) + { + getStateManager().error(e); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); } - exceptionCaught(session, e); } - break; - case ContentHeaderBody.TYPE: + exceptionCaught(session, e); + } - _protocolSession.messageContentHeaderReceived(frame.getChannel(), - (ContentHeaderBody) bodyFrame); - break; + break; - case ContentBody.TYPE: + case ContentHeaderBody.TYPE: - _protocolSession.messageContentBodyReceived(frame.getChannel(), - (ContentBody) bodyFrame); - break; + _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); + break; - case HeartbeatBody.TYPE: + case ContentBody.TYPE: - if (debug) - { - _logger.debug("Received heartbeat"); - } - break; + _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); + break; + + case HeartbeatBody.TYPE: + + if (debug) + { + _logger.debug("Received heartbeat"); + } + + break; - default: + default: } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -387,10 +479,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter final boolean debug = _logger.isDebugEnabled(); - if (debug && (sentMessages % 1000 == 0)) + if (debug && ((sentMessages % 1000) == 0)) { _logger.debug("Sent " + _messagesOut + " protocol messages"); } + _connection.bytesSent(session.getWrittenBytes()); if (debug) { @@ -408,7 +501,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _frameListeners.remove(listener); } - */ + */ public void attainState(AMQState s) throws AMQException { getStateManager().attainState(s); @@ -437,9 +530,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) - throws AMQException + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) + throws AMQException, FailoverException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } @@ -451,9 +543,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) - throws AMQException + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, + long timeout) throws AMQException, FailoverException { try { @@ -461,9 +552,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.writeFrame(frame); AMQMethodEvent e = listener.blockForFrame(timeout); + return e; - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener } catch (AMQException e) { @@ -478,25 +570,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException { return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); } /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException { - return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); + return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), + timeout); } - - public void closeSession(AMQSession session) throws AMQException { _protocolSession.closeSession(session); } + /** + * Closes the connection. + * + * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed + * anyway. + * + * @param timeout The timeout to wait for an acknowledgement to the close request. + * + * @throws AMQException If the close fails for any reason. + */ public void closeConnection(long timeout) throws AMQException { getStateManager().changeState(AMQState.CONNECTION_CLOSING); @@ -504,13 +604,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - _protocolSession.getProtocolMajorVersion(), - _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText + final AMQFrame frame = + ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(), + _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection.")); // replyText try { @@ -521,8 +621,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _protocolSession.closeProtocolSession(false); } - - + catch (FailoverException e) + { + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + } } /** @return the number of bytes read from this protocol session */ @@ -604,7 +706,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter return _protocolSession.getProtocolMajorVersion(); } - public byte getProtocolMinorVersion() { return _protocolSession.getProtocolMinorVersion(); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 85f98eab69..86db9d5859 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,71 +27,137 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +/** + * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of + * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or + * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this + * differs from a 'rendezvous' in that sense. + * + * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response. + * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they + * have been completed. + * + * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to + * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to + * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The + * channel id must be passed to the constructor. + * + * <p/>Errors from the producer are rethrown to the consumer. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent} + * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody} + * <tr><td> Block until a method is handled by the delegated to handler. + * <tr><td> Propagate the most recent exception to the consumer. + * </table> + * + * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a + * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations + * seem to use it. So wrapping the listeners is possible. + * + * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener, + * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot + * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for + * method has been received. + * + * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry + * when this happens. At the very least, restore the interrupted status flag. + * + * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to + * check that SynchronousQueue has a non-blocking put method available. + */ public abstract class BlockingMethodFrameListener implements AMQMethodListener { + /** This flag is used to indicate that the blocked for method has been received. */ private volatile boolean _ready = false; - public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException; - + /** Used to protect the shared event and ready flag between the producer and consumer. */ private final Object _lock = new Object(); - /** - * This is set if there is an exception thrown from processCommandFrame and the - * exception is rethrown to the caller of blockForFrame() - */ + /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; + /** Holds the channel id for the channel upon which this listener is waiting for a response. */ protected int _channelId; + /** Holds the incoming method. */ protected AMQMethodEvent _doneEvt = null; + /** + * Creates a new method listener, that filters incoming method to just those that match the specified channel id. + * + * @param channelId The channel id to filter incoming methods with. + */ public BlockingMethodFrameListener(int channelId) { _channelId = channelId; } /** - * This method is called by the MINA dispatching thread. Note that it could - * be called before blockForFrame() has been called. + * Delegates any additional handling of the incoming methods to another handler. * - * @param evt the frame event - * @return true if the listener has dealt with this frame - * @throws AMQException + * @param channelId The channel id of the incoming method. + * @param frame The method body. + * + * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise. */ - public boolean methodReceived(AMQMethodEvent evt) throws AMQException + public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException; + + /** + * Informs this listener that an AMQP method has been received. + * + * @param evt The AMQP method. + * + * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise. + */ + public boolean methodReceived(AMQMethodEvent evt) // throws AMQException { AMQMethodBody method = evt.getMethod(); - try + /*try + {*/ + boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); + + if (ready) { - boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); - if (ready) + // we only update the flag from inside the synchronized block + // so that the blockForFrame method cannot "miss" an update - it + // will only ever read the flag from within the synchronized block + synchronized (_lock) { - // we only update the flag from inside the synchronized block - // so that the blockForFrame method cannot "miss" an update - it - // will only ever read the flag from within the synchronized block - synchronized (_lock) - { - _doneEvt = evt; - _ready = ready; - _lock.notify(); - } + _doneEvt = evt; + _ready = ready; + _lock.notify(); } - return ready; } + + return ready; + + /*} catch (AMQException e) { error(e); // we rethrow the error here, and the code in the frame dispatcher will go round // each listener informing them that an exception has been thrown throw e; - } + }*/ } /** - * This method is called by the thread that wants to wait for a frame. + * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout + * has passed. + * + * @param timeout The timeout in milliseconds. + * + * @return The AMQP method that was received. + * + * @throws AMQException + * @throws FailoverException */ - public AMQMethodEvent blockForFrame(long timeout) throws AMQException + public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { synchronized (_lock) { @@ -117,24 +183,25 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener catch (InterruptedException e) { // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess -// if (!_ready && timeout != -1) -// { -// _error = new AMQException("Server did not respond timely"); -// _ready = true; -// } + // if (!_ready && timeout != -1) + // { + // _error = new AMQException("Server did not respond timely"); + // _ready = true; + // } } } } + if (_error != null) { if (_error instanceof AMQException) { - throw(AMQException) _error; + throw (AMQException) _error; } else if (_error instanceof FailoverException) { - // This should ensure that FailoverException is not wrapped and can be caught. - throw(FailoverException) _error; // needed to expose FailoverException. + // This should ensure that FailoverException is not wrapped and can be caught. + throw (FailoverException) _error; // needed to expose FailoverException. } else { @@ -156,6 +223,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener // set the error so that the thread that is blocking (against blockForFrame()) // can pick up the exception and rethrow to the caller _error = e; + synchronized (_lock) { _ready = true; diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 1c70ded62a..623591e0b6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -34,7 +34,7 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener _expectedClass = expectedClass; } - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException + public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException { return _expectedClass.isInstance(frame); } diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 642b928d81..0fc39a9318 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,17 @@ */ package org.apache.qpid.client.util; - +import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.Iterator; /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single * thread adding items and a single (different) thread removing items. + * + * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ public class FlowControllingBlockingQueue { @@ -81,6 +82,7 @@ public class FlowControllingBlockingQueue } } } + return o; } @@ -104,4 +106,3 @@ public class FlowControllingBlockingQueue return _queue.iterator(); } } - diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 51bbe7d0e6..c201e88104 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -14,42 +14,43 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.test.unit.client.channelclose; import junit.framework.TestCase; -import javax.jms.Connection; -import javax.jms.Session; - -import javax.jms.JMSException; -import javax.jms.ExceptionListener; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Message; -import javax.jms.TextMessage; -import javax.jms.Queue; +import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.jms.ConnectionListener; -import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener { @@ -73,15 +74,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con TransportConnection.killAllVMBrokers(); } - /* close channel, use chanel with same id ensure error. - */ - public void testReusingChannelAfterFullClosure() + */ + public void testReusingChannelAfterFullClosure() throws Exception { _connection = newConnection(); - //Create Producer + // Create Producer try { _connection.start(); @@ -113,6 +113,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con { _logger.info("Exception occured was:" + e.getErrorCode()); } + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -134,29 +135,27 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con /* close channel and send guff then send ok no errors */ - public void testSendingMethodsAfterClose() + public void testSendingMethodsAfterClose() throws Exception { try { - _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); ((AMQConnection) _connection).setConnectionListener(this); - _connection.setExceptionListener(this); - //Change the StateManager for one that doesn't respond with Close-OKs + // Change the StateManager for one that doesn't respond with Close-OKs AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager(); _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); _connection.start(); - //Test connection + // Test connection checkSendingMessage(); - //Set StateManager to manager that ignores Close-oks + // Set StateManager to manager that ignores Close-oks AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession); newStateManager.changeState(oldStateManager.getCurrentState()); @@ -214,7 +213,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con createChannelAndTest(TEST_CHANNEL); - //Test connection is still ok + // Test connection is still ok checkSendingMessage(); @@ -248,9 +247,9 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } } - private void createChannelAndTest(int channel) + private void createChannelAndTest(int channel) throws FailoverException { - //Create A channel + // Create A channel try { createChannel(channel); @@ -274,14 +273,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con private void sendClose(int channel) { - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + AMQFrame frame = + ChannelCloseOkBody.createAMQFrame(channel, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } - private void checkSendingMessage() throws JMSException { TEST++; @@ -307,8 +306,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con AMQConnection connection = null; try { - connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); connection.setConnectionListener(this); @@ -330,24 +328,24 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con fail("Creating new connection when:" + e.getMessage()); } - return connection; } - private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException + private void declareExchange(int channelId, String _type, String _name, boolean nowait) + throws AMQException, FailoverException { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + new AMQShortString(_name), // exchange + false, // internal + nowait, // nowait + true, // passive + 0, // ticket + new AMQShortString(_type)); // type if (nowait) { @@ -355,36 +353,31 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } else { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, + SYNC_TIMEOUT); } } - private void createChannel(int channelId) throws AMQException + private void createChannel(int channelId) throws AMQException, FailoverException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); } - public void onException(JMSException jmsException) { - //_logger.info("CCT" + jmsException); + // _logger.info("CCT" + jmsException); fail(jmsException.getMessage()); } public void bytesSent(long count) - { - } + { } public void bytesReceived(long count) - { - - } + { } public boolean preFailover(boolean redirect) { @@ -397,6 +390,5 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } public void failoverComplete() - { - } + { } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index d52707d965..58ac8294f2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -22,29 +22,29 @@ package org.apache.qpid.test.unit.close; import junit.framework.TestCase; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; -import javax.jms.TextMessage; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.AMQException; -import org.apache.qpid.testutil.QpidClientConnection; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; +import java.util.concurrent.atomic.AtomicInteger; public class MessageRequeueTest extends TestCase { @@ -86,7 +86,7 @@ public class MessageRequeueTest extends TestCase { super.tearDown(); - if (!passed) // clean up + if (!passed) // clean up { QpidClientConnection conn = new QpidClientConnection(BROKER); @@ -96,6 +96,7 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } + TransportConnection.killVMBroker(1); } @@ -117,7 +118,7 @@ public class MessageRequeueTest extends TestCase final MessageConsumer consumer = conn.getSession().createConsumer(q); int messagesReceived = 0; - long messageLog[] = new long[numTestMessages + 1]; + long[] messageLog = new long[numTestMessages + 1]; _logger.info("consuming..."); Message msg = consumer.receive(1000); @@ -130,15 +131,13 @@ public class MessageRequeueTest extends TestCase int msgindex = msg.getIntProperty("index"); if (messageLog[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex); } if (dt == 0) @@ -148,7 +147,7 @@ public class MessageRequeueTest extends TestCase messageLog[msgindex] = dt; - //get Next message + // get Next message msg = consumer.receive(1000); } @@ -163,7 +162,7 @@ public class MessageRequeueTest extends TestCase for (long b : messageLog) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -175,6 +174,7 @@ public class MessageRequeueTest extends TestCase index++; } + assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); @@ -199,7 +199,7 @@ public class MessageRequeueTest extends TestCase t1.start(); t2.start(); t3.start(); -// t4.start(); + // t4.start(); try { @@ -228,7 +228,7 @@ public class MessageRequeueTest extends TestCase for (long b : receieved) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0) { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -237,8 +237,10 @@ public class MessageRequeueTest extends TestCase list.append(b); failed++; } + index++; } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); passed = true; @@ -278,15 +280,14 @@ public class MessageRequeueTest extends TestCase int msgindex = result.getIntProperty("index"); if (receieved[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + + "IN:" + msgindex); } if (dt == 0) @@ -297,9 +298,8 @@ public class MessageRequeueTest extends TestCase receieved[msgindex] = dt; } - count++; - if (count % 100 == 0) + if ((count % 100) == 0) { _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); } @@ -328,11 +328,10 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException { int run = 0; -// while (run < 10) + // while (run < 10) { run++; @@ -359,7 +358,6 @@ public class MessageRequeueTest extends TestCase assertNotNull("Message should not be null", msg); - // As we have not ack'd message will be requeued. _logger.debug("Close Consumer"); consumer.close(); @@ -369,4 +367,4 @@ public class MessageRequeueTest extends TestCase } } -}
\ No newline at end of file +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0e718da19b..8d96977df2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.AMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.Session; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Queue; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; /** @@ -62,10 +64,10 @@ public class CommitRollbackTest extends TestCase { TransportConnection.createVMBroker(1); } + testMethod++; queue += testMethod; - newConnection(); } @@ -106,7 +108,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -119,7 +120,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -135,7 +136,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -151,7 +151,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -168,7 +168,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -335,13 +334,12 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); } - /** * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order * * @throws Exception On error */ - public void testSend2ThenRollback() throws Exception + /*public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -391,7 +389,7 @@ public class CommitRollbackTest extends TestCase } assertNull("test message should be null", result); - } + }*/ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { @@ -428,7 +426,7 @@ public class CommitRollbackTest extends TestCase { assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); } - else // or it will be msg 2 arriving the first time due to latency. + else // or it will be msg 2 arriving the first time due to latency. { _logger.info("Message 2 wasn't prefetched so wasn't rejected"); assertEquals("2", ((TextMessage) result).getText()); @@ -445,7 +443,6 @@ public class CommitRollbackTest extends TestCase } - public void testPutThenRollbackThenGet() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index 195ed79dab..d52da06f76 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -1,25 +1,25 @@ package org.apache.qpid.testutil; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; public class QpidClientConnection implements ExceptionListener { - private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); private boolean transacted = true; @@ -40,17 +40,16 @@ public class QpidClientConnection implements ExceptionListener setPrefetch(5000); } - public void connect() throws JMSException { if (!connected) { /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; try { @@ -63,7 +62,6 @@ public class QpidClientConnection implements ExceptionListener session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - _logger.info("starting connection"); connection.start(); @@ -124,7 +122,6 @@ public class QpidClientConnection implements ExceptionListener this.prefetch = prefetch; } - /** override as necessary */ public void onException(JMSException exception) { @@ -266,4 +263,3 @@ public class QpidClientConnection implements ExceptionListener _logger.info("consumed: " + messagesReceived); } } - diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java index 931c6cd87a..eb736d437f 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java @@ -23,14 +23,17 @@ package org.apache.qpid; import org.apache.qpid.protocol.AMQConstant; /** - * AMQConnectionClosedException indicates that an operation cannot be performed becauase a connection has been closed. + * AMQConnectionClosedException indicates that a connection has been closed. + * + * <p/>This exception is really used as an event, in order that the method handler that raises it creates an event + * which is propagated to the io handler, in order to notify it of the connection closure. * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represents a failed operation on a closed conneciton. + * <tr><td> Represents a the closure of a connection. * </table> * - * @todo Does this duplicate AMQConnectionException? + * @todo Should review where exceptions-as-events */ public class AMQConnectionClosedException extends AMQException { diff --git a/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java b/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java index 883e13e5e6..a0574efa72 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java @@ -14,13 +14,22 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid; - +/** + * AMQPInvalidClassException indicates an error when trying to store an illegally typed argument in a field table. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents illegal argument type for field table values. + * </table> + * + * @todo Could just re-use an exising exception like IllegalArgumentException or ClassCastException. + */ public class AMQPInvalidClassException extends RuntimeException { public AMQPInvalidClassException(String s) diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 808272e9ec..2fbeeda1d4 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.AMQMethodBody; * * <p/>An event listener may be associated with a particular context, usually an AMQP channel, and in addition to * receiving method events will be notified of errors on that context. This enables listeners to perform any clean - * up that they need to do before the context is closed. + * up that they need to do before the context is closed or retried. * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities @@ -64,8 +64,6 @@ public interface AMQMethodListener * any necessary clean-up for the context. * * @param e The underlying exception that is the source of the error. - * - * @todo Consider narrowing the exception, or wrapping it. */ void error(Exception e); } diff --git a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java b/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java index faeb9d7167..10f6a27293 100644 --- a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java @@ -26,6 +26,8 @@ package org.apache.qpid.util; * <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
+ *
+ * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays.
*/
public class PrettyPrintingUtils
{
diff --git a/java/perftests/RunningPerformanceTests.txt b/java/perftests/RunningPerformanceTests.txt index 95051c088e..54291483bf 100644 --- a/java/perftests/RunningPerformanceTests.txt +++ b/java/perftests/RunningPerformanceTests.txt @@ -116,7 +116,7 @@ The specific performance test cases for QPid are implemented as extensions to JU The most common test case to run is implemented in the class PingAsyncTestPerf, which sends and recieves messages simultaneously. This class uses a PingPongProdicer to do its sending and receiving, and wraps it in a suitable way to make it callable through the extended JUnit test runner. This class also accpets another parameter "batchSize" with a default of "1000". This tells the test how many messages to send before stopping sending and waiting for them all to come back. The actual value entered does not matter too much, but typically values larger than 1000 are used to ensure that there is a reasonable opportunity for simultaneous sending and receiving, and less than 10000 to ensure that each test method invocation does not go on for too long. -The test script parameters can all be seen in the pom.xml file. A three letter code is used on the test scripts, first letter P or T for persistent or transient, second letter Q or T for queue (p2p) or topic (pub/sub), third letter R for reliability tests, C for client scaling tests, M for message size tests.Typically tests run and sample their results for 10 minutes, to get a reasonable measurement of a broker running under a steady load. The tests as configured do not measure peak performance. +The test script parameters can all be seen in the pom.xml file. A three letter code is used on the test scripts, first letter P or T for persistent or transient, second letter Q or T for queue (p2p) or topic (pub/sub), third letter R for reliability tests, C for client scaling tests, M for message size tests.Typically tests run and sample their results for 10 minutes, to get a reasonable measurement of a broker running under a steady load. The tests as configured do not measure 'burst' performance. The reliability/burn in tests, test the broker running at slightly below its maximum throughput for a period of 24 hours. Their purpose is to check that the broker remains stable under load for a reasonable duration, in order to provide some confidence in the long-term stability of its process. These tests are intended to be run as a two step process. The first two tests run for 10 minutes and are used to asses the broker throughput for the test. The output from these tests are to be fed into the rate limiter for the second set of tests, so that the broker may be set up to run at slightly below its maximum throughput for the 24 hour duration. It is suggested that 90% of the rate achieved by the first two tests should be used for this. diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 17cc4a3be9..8d03755efc 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -305,16 +305,16 @@ <PTM-Qpid-2-1M>-n PTM-Qpid-2-1M -d10M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true transacted=false commitBatchSize=10 batchSize=1000 messageSize=1048476 destinationsCount=1 rate=0 maxPending=20000000</PTM-Qpid-2-1M> <!-- Failover Tests. --> - <FT-Qpid-1>-n FT-Qpid-1 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1> - <FT-Qpid-2>-n FT-Qpid-2 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2> - <FT-Qpid-3>-n FT-Qpid-3 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3> - <FT-Qpid-4>-n FT-Qpid-4 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4> - <FT-Qpid-5>-n FT-Qpid-5 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5> - <FT-Qpid-1-P>-n FT-Qpid-1-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1-P> - <FT-Qpid-2-P>-n FT-Qpid-2-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2-P> - <FT-Qpid-3-P>-n FT-Qpid-3-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3-P> - <FT-Qpid-4-P>-n FT-Qpid-4-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4-P> - <FT-Qpid-5-P>-n FT-Qpid-5-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5-P> + <FT-Qpid-1>-n FT-Qpid-1 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1> + <FT-Qpid-2>-n FT-Qpid-2 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2> + <FT-Qpid-3>-n FT-Qpid-3 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3> + <FT-Qpid-4>-n FT-Qpid-4 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4> + <FT-Qpid-5>-n FT-Qpid-5 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5> + <FT-Qpid-1-P>-n FT-Qpid-1-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1-P> + <FT-Qpid-2-P>-n FT-Qpid-2-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2-P> + <FT-Qpid-3-P>-n FT-Qpid-3-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3-P> + <FT-Qpid-4-P>-n FT-Qpid-4-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4-P> + <FT-Qpid-5-P>-n FT-Qpid-5-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5-P> </commands> </configuration> diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java index c6cbac0ba8..6f62931338 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -1,7 +1,7 @@ package org.apache.qpid.server.failure; import junit.framework.TestCase; -import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.testutil.QpidClientConnectionHelper; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -17,7 +17,7 @@ public class HeapExhaustion extends TestCase { private static final Logger _logger = Logger.getLogger(HeapExhaustion.class); - protected QpidClientConnection conn; + protected QpidClientConnectionHelper conn; protected final String BROKER = "localhost"; protected final String vhost = "/test"; protected final String queue = "direct://amq.direct//queue"; @@ -32,7 +32,7 @@ public class HeapExhaustion extends TestCase protected void setUp() throws Exception { - conn = new QpidClientConnection(BROKER); + conn = new QpidClientConnectionHelper(BROKER); conn.setVirtualHost(vhost); conn.connect(); diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java index d3f064293e..fcc3adee13 100644 --- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java @@ -1,271 +1,275 @@ -package org.apache.qpid.testutil; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.JMSAMQException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; - -import javax.jms.ExceptionListener; -import javax.jms.Session; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.TextMessage; -import javax.jms.DeliveryMode; - -public class QpidClientConnection implements ExceptionListener -{ - - private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); - - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection(String broker) - { - super(); - setVirtualHost("/test"); - setBrokerList(broker); - setPrefetch(5000); - } - - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } - - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - sender.setDeliveryMode(deliveryMode); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) - { - connect(); - } - - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; - - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) - { - result = null; - } - else - { - _logger.info("warning: received non-text message"); - result = message; - } - - return result; - } - - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } - - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - - session.commit(); - consumer.close(); - _logger.info("consumed: " + messagesReceived); - } -} +package org.apache.qpid.testutil;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * @todo This was originally cut and paste from the client module leading to a duplicate class, then altered very
+ * slightly. To avoid the duplicate class the name was altered slightly to have 'Helper' on the end in order
+ * to distinguish it from the original. Delete this class and use the original instead, just upgrade it to
+ * provide the new features needed.
+ */
+public class QpidClientConnectionHelper implements ExceptionListener
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnectionHelper.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnectionHelper(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ sender.setDeliveryMode(deliveryMode);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}
|