From 6d143b8c89fa2ec889435c4340698a711abf80ed Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 18 Oct 2007 10:29:45 +0000 Subject: Merged revisions 573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-581967,581969-582197,582199-582200,582203-582204,582206-582262,582264,582267-583084,583087,583089-583104,583106-583146,583148-583153,583155-583169,583171-583172,583174-583398,583400-583414,583416-583417,583419-583437,583439-583482,583484-583517,583519-583545,583547,583549-583774,583777-583807,583809-583881,583883-584107,584109-584112,584114-584123,584125-585564,585566-585569,585571-585574,585576-585641,585643-585905,585907-585914 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r585912 | ritchiem | 2007-10-18 11:10:19 +0100 (Thu, 18 Oct 2007) | 1 line QPID-637 : Patch provided by Aidan Skinner to ensure correct behaviour of session closure. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@585918 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 113 +++++++------------ .../apache/qpid/client/BasicMessageConsumer.java | 124 +++++++++++---------- 2 files changed, 110 insertions(+), 127 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fee6690538..a0b79b135d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -112,22 +113,20 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** - * *

*
CRC Card
Responsibilities Collaborations *
*
* * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For - * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second - * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of - * the fail-over process, the retry handler could be used to automatically retry the operation once the connection - * has been reestablished. All fail-over protected operations should be placed in private methods, with - * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the - * fail-over process sets a nowait flag and uses an async method call instead. - * + * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second + * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of + * the fail-over process, the retry handler could be used to automatically retry the operation once the connection + * has been reestablished. All fail-over protected operations should be placed in private methods, with + * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the + * fail-over process sets a nowait flag and uses an async method call instead. * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, - * after looking at worse bottlenecks first. + * after looking at worse bottlenecks first. */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -222,9 +221,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; - /** - * Holds the highest received delivery tag. - */ + /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); /** Holds the dispatcher thread for this session. */ @@ -236,9 +233,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Holds all of the producers created by this session, keyed by their unique identifiers. */ private Map _producers = new ConcurrentHashMap(); - /** - * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. - */ + /** Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. */ private int _nextTag = 1; /** @@ -374,12 +369,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a new session on a connection with the default message factory factory. * - * @param con The connection on which to create the session. - * @param channelId The unique identifier for the session. - * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. - * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. - * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) @@ -402,12 +397,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - - return new JMSBytesMessage(); - } + checkNotClosed(); + return new JMSBytesMessage(); } /** @@ -462,9 +453,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param exchangeName The exchange to bind the queue on. * * @throws AMQException If the queue cannot be bound for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, @@ -501,14 +490,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be - * re-opened. May need to examine this more carefully. - * + * re-opened. May need to examine this more carefully. * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, - * because the failover process sends the failover event before acquiring the mutex itself. + * because the failover process sends the failover event before acquiring the mutex itself. */ public void close(long timeout) throws JMSException { @@ -579,6 +565,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { synchronized (_connection.getFailoverMutex()) { + if (e instanceof AMQDisconnectedException) + { + if (_dispatcher != null) + { + // Failover failed and ain't coming back. Knife the dispatcher. + _dispatcher.interrupt(); + } + } synchronized (_messageDeliveryLock) { // An AMQException has an error code and message already and will be passed in when closure occurs as a @@ -610,7 +604,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException @@ -861,12 +854,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - - return new JMSMapMessage(); - } + checkNotClosed(); + return new JMSMapMessage(); } public javax.jms.Message createMessage() throws JMSException @@ -876,12 +865,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - - return (ObjectMessage) new JMSObjectMessage(); - } + checkNotClosed(); + return (ObjectMessage) new JMSObjectMessage(); } public ObjectMessage createObjectMessage(Serializable object) throws JMSException @@ -955,7 +940,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param exclusive Flag to indicate that the queue is exclusive to this client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, @@ -1301,7 +1285,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi *

  • Stop message delivery.
  • *
  • Mark all messages that might have been delivered but not acknowledged as "redelivered". *
  • Restart the delivery sequence including all unacknowledged messages that had been previously delivered. - * Redelivered messages do not have to be delivered in exactly their original delivery order.
  • + * Redelivered messages do not have to be delivered in exactly their original delivery order. * * *

    If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and @@ -1311,7 +1295,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. * Not that this does not necessarily mean that the recovery has failed, but simply that it * is not possible to tell if it has or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void recover() throws JMSException @@ -1402,7 +1385,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting delivery tag:" + deliveryTag); + _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode()); } AMQFrame basicRejectBody = @@ -1423,7 +1406,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does * not mean that the rollback is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void rollback() throws JMSException @@ -1695,7 +1677,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return true if the queue is bound to the exchange and routing key, false if not. * * @throws JMSException If the query fails for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) @@ -1768,10 +1749,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * * @throws AMQException If the session cannot be started for any reason. - * * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the - * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages - * for each subsequent call to flow.. only need to do this if we have called stop. + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. */ void start() throws AMQException { @@ -2138,7 +2118,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param nowait * * @throws AMQException If the exchange cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, @@ -2183,9 +2162,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * the client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Verify the destiation is valid or throw an exception. - * * @todo Be aware of possible changes to parameter order as versions change. */ private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) @@ -2229,7 +2206,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName The name of the queue to delete. * * @throws JMSException If the queue could not be deleted for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void deleteQueue(final AMQShortString queueName) throws JMSException @@ -2404,11 +2380,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _producers.put(new Long(producerId), producer); } - private void rejectAllMessages(boolean requeue) - { - rejectMessagesForConsumerTag(null, requeue); - } - /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) @@ -2528,7 +2499,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * should be unsuspended. * * @throws AMQException If the session cannot be suspended for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void suspendChannel(boolean suspend) throws AMQException // , FailoverException @@ -2571,6 +2541,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _lock = new Object(); private final AtomicLong _rollbackMark = new AtomicLong(-1); + private String dispatcherID = "" + System.identityHashCode(this); public Dispatcher() { @@ -2606,6 +2577,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + //Let the dispatcher deal with this when it gets to them. // closeConsumer consumer.markClosed(); @@ -2639,8 +2611,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - // should perhaps clear the _SQ here. - // consumer._synchronousQueue.clear(); + // cClear the _SQ here. consumer.clearReceiveQueue(); } @@ -2756,14 +2727,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (consumer == null) { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" + message.getDeliverBody().deliveryTag + "] from queue " + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); } else { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ") [" + + message.getDeliverBody().deliveryTag + "] from queue consumer(" + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9c5c344275..ddaf0cfd93 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -142,9 +142,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private List _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -221,7 +221,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " - + _destination); + + _destination); } } else @@ -243,9 +243,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer //todo: handle case where connection has already been started, and the dispatcher has alreaded started // putting values on the _synchronousQueue - _messageListener.set(messageListener); - _session.setHasMessageListeners(); - _session.startDistpatcherIfNecessary(); + _messageListener.set(messageListener); + _session.setHasMessageListeners(); + _session.startDistpatcherIfNecessary(); } } } @@ -360,14 +360,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer long endtime = System.currentTimeMillis() + l; while (System.currentTimeMillis() < endtime && o == null) { - try + try { o = _synchronousQueue.poll(endtime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { _logger.warn("Interrupted: " + e); - if (isClosed()) + if (isClosed()) { return null; } @@ -381,11 +381,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { o = _synchronousQueue.take(); - } + } catch (InterruptedException e) { _logger.warn("Interrupted: " + e); - if (isClosed()) + if (isClosed()) { return null; } @@ -516,9 +516,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = - BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag - false); // nowait + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -577,7 +577,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " markClosed():" - + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -609,9 +609,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { @@ -696,15 +696,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } @@ -714,43 +714,43 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) + if (_dups_ok_acknowledge_send) { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), true); + } } - } - break; + break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; } } @@ -883,6 +883,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null)) { + _closed.set(true); _receivingThread.interrupt(); } @@ -938,6 +939,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer Iterator iterator = _synchronousQueue.iterator(); + int initialSize = _synchronousQueue.size(); + + boolean removed = false; while (iterator.hasNext()) { @@ -952,16 +956,24 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } iterator.remove(); + removed = true; } else { _logger.error("Queue contained a :" + o.getClass() - + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); + removed = true; } } + if (removed && (initialSize == _synchronousQueue.size())) + { + _logger.error("Queue had content removed but didn't change in size." + initialSize); + } + + if (_synchronousQueue.size() != 0) { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); @@ -974,7 +986,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String debugIdentity() { - return String.valueOf(_consumerTag); + return String.valueOf(_consumerTag) + "[" + System.identityHashCode(this) + "]"; } public void clearReceiveQueue() -- cgit v1.2.1