From c5b44d59395ba7ebe8c84ce6461c4e39a0e5b99a Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 14 Feb 2007 08:21:37 +0000 Subject: QPID-346 Message loss after rollback/recover Messages were still occasionally being sent twice. AMQChannel - added trace level logging that will show an error if the same message is attempted to be sent to the same client. AMQMessage - Remove logic that says the same subscriber can take always 'take' the message. SubscriptionImpl - Release message when it is put back on to the resendQueue this will allow it to be re-'taken' AMQSession - Added method to Dispatcher to clean up incomming _queue to try and prevent messages arriving for closed consumers. BasicMessageConsumer - added comments git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@507433 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 14 +++++ .../org/apache/qpid/server/queue/AMQMessage.java | 4 -- .../apache/qpid/server/queue/SubscriptionImpl.java | 4 ++ .../java/org/apache/qpid/client/AMQSession.java | 66 +++++++++++++++++++++- .../apache/qpid/client/BasicMessageConsumer.java | 5 ++ 5 files changed, 87 insertions(+), 6 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index e056af55cf..90ab71f703 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -372,6 +372,20 @@ public class AMQChannel + _unacknowledgedMessageMap.size() + ":" + _unacknowledgedMessageMap.toString()); } + //Debug adding messages to this map. + if (_log.isTraceEnabled()) + { + for (Map.Entry entry : _unacknowledgedMessageMap.entrySet()) + { + if (entry.getValue().message == message) + { + // this is set at error level but only output it if we are tracing. + _log.error("Adding message (" + System.identityHashCode(message) + + ") that is already in unacked map entryTag:" + + entry.getKey() + " dT:" + deliveryTag); + } + } + } _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); _lastDeliveryTag = deliveryTag; checkSuspension(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index af7d7ea493..e6f0cc282b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -448,10 +448,6 @@ public class AMQMessage { if (_taken.getAndSet(true)) { - if (sub == _takenBySubcription) - { - return false; - } return true; } else diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e6e3a9cadb..230430ab12 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -537,6 +537,10 @@ public class SubscriptionImpl implements Subscription public void addToResendQueue(AMQMessage msg) { + //fixme - will this be ok as we need to ensure redelivery to same subscriber first + //release the message so it can be redelivered + msg.release(); + // add to our resend queue getResendQueue().add(msg); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e475270ecd..783678f67c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -47,6 +47,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; +import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -289,6 +290,61 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + /** + * The dispatcher should be stopped when calling this. + * + * @param consumerTag + */ + public void removePending(String consumerTag) + { + + synchronized (_lock) + { + boolean stopped = connectionStopped(); + + _dispatcher.setConnectionStopped(false); + + LinkedList tmpList = new LinkedList(); + + while (_queue.size() != 0) + { + UnprocessedMessage message = null; + try + { + message = (UnprocessedMessage) _queue.take(); + + if (!message.deliverBody.consumerTag.equals(consumerTag)) + { + tmpList.add(message); + } + else + { + _logger.error("Pruned pending message for consumer:" + consumerTag); + } + } + catch (InterruptedException e) + { + _logger.error("Interrupted whilst taking message"); + } + } + + if (!tmpList.isEmpty()) + { + _logger.error("Tmp list is not empty"); + } + + for (UnprocessedMessage msg : tmpList) + { + _queue.add(msg); + } + + if (stopped) + { + _dispatcher.setConnectionStopped(stopped); + } + } + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -599,8 +655,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); try { @@ -618,6 +672,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully + // we pass null since this is not an error case + closeProducersAndConsumers(null); + } catch (AMQException e) { @@ -1784,7 +1841,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void deregisterConsumer(BasicMessageConsumer consumer) { + //need to clear pending messages from session _queue that the dispatcher will handle + // or we will get + // _dispatcher.removePending(consumer.getConsumerTag()); + _consumers.remove(consumer.getConsumerTag()); + String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 058afab605..1607326e47 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -481,8 +481,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + + //this will remove consumer from _consumers map deregisterConsumer(); + + // clears unacks from this consumer _unacknowledgedDeliveryTags.clear(); + if (_messageListener != null && _receiving.get()) { _logger.info("Interrupting thread: " + _receivingThread); -- cgit v1.2.1