diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-09-11 11:39:10 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-09-11 11:39:10 +0000 |
commit | b622a494bd6fae78c197814da937f7fc20072e4a (patch) | |
tree | fd2cbd58f446a66b15c211877540e52eb1134545 | |
parent | e2a50b3ed73db0062554014ad97c2309f8016906 (diff) | |
download | qpid-python-b622a494bd6fae78c197814da937f7fc20072e4a.tar.gz |
QPID-590 : Provide test case and resolution to prevent deadlock occurring on the client when two threads work on the AMQSession object.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@574555 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 490 insertions, 277 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 4e259f651c..bdebc8e50a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -72,7 +72,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +99,6 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; - import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -206,14 +204,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * subscriptions between executions of the client. */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); /** * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + new ConcurrentHashMap<BasicMessageConsumer, String>(); /** * Used to hold incoming messages. @@ -241,11 +239,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * consumer. */ private Map<AMQShortString, BasicMessageConsumer> _consumers = - new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = - new ConcurrentHashMap<Destination, AtomicInteger>(); + new ConcurrentHashMap<Destination, AtomicInteger>(); /** * Used as a source of unique identifiers for producers within the session. @@ -305,15 +303,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = - Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); + Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); _immediatePrefetch = - _strictAMQP - || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); + _strictAMQP + || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); _connection = con; _transacted = transacted; @@ -334,31 +332,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = - new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, - new FlowControllingBlockingQueue.ThresholdListener() - { - public void aboveThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark - + ") so suspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(true)).start(); - } - } - - public void underThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark - + ") so unsuspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(false)).start(); - } - } - }); + new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue.ThresholdListener() + { + public void aboveThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(true)).start(); + } + } + + public void underThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(false)).start(); + } + } + }); } else { @@ -377,10 +375,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, - int defaultPrefetchLow) + int defaultPrefetchLow) { this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, - defaultPrefetchLow); + defaultPrefetchLow); } // ===== JMS Session methods. @@ -435,8 +433,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void acknowledgeMessage(long deliveryTag, boolean multiple) { final AMQFrame ackFrame = - BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, - multiple); + BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + multiple); if (_logger.isDebugEnabled()) { @@ -463,27 +461,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException + final AMQShortString exchangeName) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame queueBind = + AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket + arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); - return null; - } - }, _connection).execute(); + return null; + } + }, _connection).execute(); } /** @@ -510,59 +508,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isInfoEnabled()) { _logger.info("Closing session: " + this + ":" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } - synchronized(_messageDeliveryLock) + synchronized (_messageDeliveryLock) { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_connection.getFailoverMutex()) - { - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_connection.getFailoverMutex()) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { + // we pass null since this is not an error case + closeProducersAndConsumers(null); - getProtocolHandler().closeSession(this); + try + { - final AMQFrame frame = - ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + final AMQFrame frame = + ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - finally - { - _connection.deregisterSession(_channelId); + getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + finally + { + _connection.deregisterSession(_channelId); + } } } } - } } /** @@ -572,26 +570,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) throws JMSException { - synchronized(_messageDeliveryLock) - { - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) + synchronized (_connection.getFailoverMutex()) { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + _closed.set(true); + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); - } + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } @@ -626,7 +624,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final AMQProtocolHandler handler = getProtocolHandler(); handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), - TxCommitOkBody.class); + TxCommitOkBody.class); } catch (AMQException e) { @@ -709,12 +707,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, - messageSelector, null, true, true); + messageSelector, null, true, true); } public MessageConsumer createConsumer(Destination destination) throws JMSException @@ -722,7 +720,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null, - false, false); + false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -730,20 +728,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, - messageSelector, null, false, false); + messageSelector, null, false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkValidDestination(destination); return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, - messageSelector, null, false, false); + messageSelector, null, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector) throws JMSException + String selector) throws JMSException { checkValidDestination(destination); @@ -751,7 +749,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector) throws JMSException + boolean exclusive, String selector) throws JMSException { checkValidDestination(destination); @@ -759,7 +757,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector, FieldTable rawSelector) throws JMSException + String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); @@ -767,12 +765,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector, FieldTable rawSelector) throws JMSException + boolean exclusive, String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false, - false); + false); } public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException @@ -787,7 +785,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (subscriber.getTopic().equals(topic)) { throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); + + name); } else { @@ -815,7 +813,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); + + "for creation durableSubscriber. Requesting queue deletion regardless."); } deleteQueue(dest.getAMQQueueName()); @@ -825,7 +823,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) { deleteQueue(dest.getAMQQueueName()); } @@ -842,7 +840,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Note, currently this does not handle reuse of the same name with different topics correctly. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkNotClosed(); checkValidTopic(topic); @@ -899,13 +897,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) - throws JMSException + throws JMSException { return createProducerImpl(destination, mandatory, immediate); } public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, - boolean waitUntilSent) throws JMSException + boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); } @@ -955,28 +953,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive) throws AMQException + final boolean exclusive) throws AMQException { new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame queueDeclare = + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); - - return null; - } - }, _connection).execute(); + null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return null; + } + }, _connection).execute(); } /** @@ -1269,8 +1267,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isDebugEnabled()) { _logger.debug("Message[" - + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) - + "] received in session with channel id " + _channelId); + + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) + + "] received in session with channel id " + _channelId); } if (message.getDeliverBody() == null) @@ -1343,15 +1341,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue + getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else { _connection.getProtocolHandler().syncWrite( - BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue - , BasicRecoverOkBody.class); + BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue + , BasicRecoverOkBody.class); } if (!isSuspended) @@ -1401,8 +1399,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } AMQFrame basicRejectBody = - BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, - requeue); + BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + requeue); _connection.getProtocolHandler().writeFrame(basicRejectBody); } @@ -1442,7 +1440,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); if (!isSuspended) { @@ -1521,7 +1519,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." - + " Requesting queue deletion regardless."); + + " Requesting queue deletion regardless."); } deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); @@ -1542,8 +1540,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -1586,9 +1584,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } BasicMessageConsumer consumer = - new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, - _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, - exclusive, _acknowledgeMode, noConsume, autoClose); + new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, + _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, + exclusive, _acknowledgeMode, noConsume, autoClose); if (_messageListener != null) { @@ -1608,7 +1606,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQInvalidRoutingKeyException e) { JMSException ide = - new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); + new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); ide.setLinkedException(e); throw ide; } @@ -1694,26 +1692,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException + throws JMSException { try { AMQMethodEvent response = - new FailoverRetrySupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() - { - public AMQMethodEvent execute() throws AMQException, FailoverException - { - AMQFrame boundFrame = - ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), - getProtocolMinorVersion(), exchangeName, // exchange - queueName, // queue - routingKey); // routingKey + new FailoverRetrySupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + AMQFrame boundFrame = + ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), + getProtocolMinorVersion(), exchangeName, // exchange + queueName, // queue + routingKey); // routingKey - return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); - } - }, _connection).execute(); + } + }, _connection).execute(); // Extract and return the response code from the query. ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); @@ -1783,9 +1781,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - synchronized void startDistpatcherIfNecessary() + void startDistpatcherIfNecessary() { + //If we are the dispatcher then we don't need to check we are started + if (Thread.currentThread() == _dispatcher) + { + return; + } + // If IMMEDIATE_PREFETCH is not set then we need to start fetching + // This is final per session so will be multi-thread safe. if (!_immediatePrefetch) { // We do this now if this is the first call on a started connection @@ -1922,14 +1927,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this)) { throw new javax.jms.InvalidDestinationException( - "Cannot create a subscription on a temporary topic created in another session"); + "Cannot create a subscription on a temporary topic created in another session"); } if (!(topic instanceof AMQTopic)) { throw new javax.jms.InvalidDestinationException( - "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " - + topic.getClass().getName()); + "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + + topic.getClass().getName()); } return (AMQTopic) topic; @@ -2029,7 +2034,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName */ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { // need to generate a consumer tag on the client so we can exploit the nowait flag AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); @@ -2058,14 +2063,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame jmsConsume = - BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket + BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + getTicket()); // ticket if (nowait) { @@ -2085,13 +2090,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) - throws JMSException + throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) throws JMSException + final boolean immediate, final boolean waitUntilSent) throws JMSException { return new FailoverRetrySupport<BasicMessageProducer, JMSException>( new FailoverProtectedOperation<BasicMessageProducer, JMSException>() @@ -2101,8 +2106,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); long producerId = getNextProducerId(); BasicMessageProducer producer = - new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, + AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); registerProducer(producerId, producer); return producer; @@ -2130,29 +2135,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException + final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame exchangeDeclare = + AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - nowait, // nowait - false, // passive - getTicket(), // ticket - type); // type - - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); - - return null; - } - }, _connection).execute(); + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + nowait, // nowait + false, // passive + getTicket(), // ticket + type); // type + + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + + return null; + } + }, _connection).execute(); } /** @@ -2177,7 +2182,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) - throws AMQException + throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ return new FailoverNoopSupport<AMQShortString, AMQException>( @@ -2192,15 +2197,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } AMQFrame queueDeclare = - QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + false, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + getTicket()); // ticket protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); @@ -2225,22 +2230,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException { - public Object execute() throws AMQException, FailoverException - { - AMQFrame queueDeleteFrame = + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + getTicket()); // ticket - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); - return null; - } - }, _connection).execute(); + return null; + } + }, _connection).execute(); } catch (AMQException e) { @@ -2359,7 +2364,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { suspendChannel(true); _logger.info( - "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); + "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } catch (AMQException e) { @@ -2408,7 +2413,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isInfoEnabled()) { _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" - + requeue); + + requeue); if (messages.hasNext()) { @@ -2428,7 +2433,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_logger.isDebugEnabled()) { _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliverBody().deliveryTag); + + message.getDeliverBody().deliveryTag); } messages.remove(); @@ -2469,44 +2474,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void returnBouncedMessage(final UnprocessedMessage message) { _connection.performConnectionTask(new Runnable() + { + public void run() { - public void run() + try { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange, - message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); + message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); + AMQShortString reason = message.getBounceBody().replyText; + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); } - catch (Exception e) + else if (errorCode == AMQConstant.NO_ROUTE) { - _logger.error( + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + + } + catch (Exception e) + { + _logger.error( "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } } - }); + } + }); } /** @@ -2533,8 +2538,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _suspended = suspend; AMQFrame channelFlowFrame = - ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - !suspend); + ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + !suspend); _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } @@ -2670,7 +2675,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _lock.wait(); } - synchronized(_messageDeliveryLock) + synchronized (_messageDeliveryLock) { dispatchMessage(message); } @@ -2713,7 +2718,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcherLogger.isDebugEnabled()) { _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") - + ": Currently " + (currently ? "Stopped" : "Started")); + + ": Currently " + (currently ? "Stopped" : "Started")); } } @@ -2725,7 +2730,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (message.getDeliverBody() != null) { final BasicMessageConsumer consumer = - (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); if ((consumer == null) || consumer.isClosed()) { @@ -2734,14 +2739,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " - + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); + + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" - + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); + + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } // Don't reject if we're already closing diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index dfac0d45a8..014fd36414 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -240,15 +240,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (messageListener != null) { - // handle case where connection has already been started, and the dispatcher has alreaded started + //todo: handle case where connection has already been started, and the dispatcher has alreaded started // putting values on the _synchronousQueue - synchronized (_session) - { _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDistpatcherIfNecessary(); - } } } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java b/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java new file mode 100644 index 0000000000..a25af30008 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java @@ -0,0 +1,211 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.failure; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.Random; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * DeadlockTestCase: + * From a client requirement. + * + * The JMS Spec specifies that a Session has a single thread of control. And as such setting message listeners from a + * second thread is not allowed. + * Section 4.4.6 of the Spec states: + <quote>Another consequence is that a connection must be in stopped mode to set up a +session with more than one message listener. The reason is that when a +connection is actively delivering messages, once the first message listener for a +session has been registered, the session is now controlled by the thread of +control that delivers messages to it. At this point a client thread of control +cannot be used to further configure the session.</quote> + * + * It, however, does not specified what we should do in the case. it only states: + <quote>Once a connection has been started, all its sessions with a registered message +listener are dedicated to the thread of control that delivers messages to them. It +is erroneous for client code to use such a session from another thread of +control. The only exception to this is the use of the session or connection close +method.</quote> + * + * While it may be erroneous the causing a Deadlock is not a very satisfactory solution. This test ensures that we do + * no do this. There is no technical reason we cannot currently allow the setting of a messageListener on a new consumer. + * The only caveate is due to QPID-577 there is likely to be temporary message 'loss'. As they are stuck on the internal + * _synchronousQueue pending a synchronous receive. + * + */ +public class DeadlockTest extends TestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(DeadlockTest.class); + + + public static final String QPID_BROKER_CONNECTION_PROPERTY = "QPIDBROKER"; + + private String topic1 = "TEST.DeadLock1.TMP"; + private String topic2 = "TEST.DeadLock2.TMP"; + + private Session sess; + + private Semaphore s = new Semaphore(0); + private final String LOCAL = "tcp://localhost:5670"; + private final String VM = "vm://:1"; + + private String BROKER = VM; + + String connectionString = System.getProperty(QPID_BROKER_CONNECTION_PROPERTY, + "amqp://guest:guest@/test?brokerlist='" + BROKER + "'"); + + + public void setUp() throws AMQVMBrokerCreationException + { + if (BROKER.equals("vm://:1")) + { + TransportConnection.createVMBroker(1); + } + } + + public void tearDown() throws AMQVMBrokerCreationException + { + if (BROKER.equals("vm://:1")) + { + TransportConnection.killAllVMBrokers(); + } + } + + public class EmptyMessageListener implements javax.jms.MessageListener + { + public void onMessage(Message message) + { + // do nothing + } + } + + public void setSessionListener(String topic, javax.jms.MessageListener listener) + { + try + { + Topic jmsTopic = sess.createTopic(topic); + MessageConsumer subscriber = sess.createConsumer(jmsTopic); + subscriber.setMessageListener(listener); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Caught JMSException"); + } + } + + public class TestMessageListener implements javax.jms.MessageListener + { + public Random r = new Random(); + + public void onMessage(Message message) + { + if (r.nextBoolean()) + { + setSessionListener(topic2, new EmptyMessageListener()); + } + } + + } + + public void testDeadlock() throws InterruptedException, URLSyntaxException, JMSException + { + // in order to trigger the deadlock we need to + // set a message listener from one thread + // whilst receiving a message on another thread and on that thread also setting (the same?) message listener + AMQConnectionFactory acf = new AMQConnectionFactory(connectionString); + Connection conn = acf.createConnection(); + conn.start(); + sess = conn.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + setSessionListener(topic1, new TestMessageListener()); + + + Thread th = new Thread() + { + public void run() + { + try + { + Topic jmsTopic = sess.createTopic(topic1); + MessageProducer producer = sess.createProducer(jmsTopic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + Random r = new Random(); + long end = System.currentTimeMillis() + 2000; + while (end - System.currentTimeMillis() > 0) + { + if (r.nextBoolean()) + { + _logger.info("***************** send message"); + Message jmsMessage = sess.createTextMessage(""); + producer.send(jmsMessage); + } + else + { + _logger.info("***************** set session listener"); + setSessionListener(topic2, new EmptyMessageListener()); + } + Thread.yield(); + } + _logger.info("done sends"); + s.release(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Caught JMSException"); + } + } + }; + th.setDaemon(true); + th.setName("testDeadlock"); + th.start(); + + boolean success = s.tryAcquire(1, 4, TimeUnit.SECONDS); + + // if we failed, closing the connection will just hang the test case. + if (success) + { + conn.close(); + } + + if (!success) + { + fail("Deadlock ocurred"); + } + } +} |