From c257715e5f77e6e3bbddb4ccced0645ac0199698 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 13 Feb 2007 11:46:37 +0000 Subject: QPID-346 Message loss after rollback/recover With Multiple consumers closing and requeuing occasionally a message would be lost given the use of msg.getDeliveredToConsumer() as this will be set on the first send an so on the infrequent occasion that a subscriber closes whilst a message is being delivered then that message would be lost. AMQChannel - Fixed bug where messages would not be requeued on consumer closure. Increased quantity of logging. AMQMessage - Added method to get 'taken' status. AMQQueue - Wrapped all log messages with correct isEnabled() also removed debug() method as this makes debugging very difficult (log4j will always report the same log line, requiring searching of the file to fine the actual log line.) ConcurrentSelectorDeliveryManager - Increased and enclosed logging (isXEnabled). Wrapped the send calls with a lock on the Subscription(Impl) such that a send will not occur if the Subscription(Impl) has been closed. SubscriptionImpl - Used sendLock to set SI closed. This is used to mark subscription as suspended so no messages will be sent to it. Increased and wrapped logging. SubscriptionSet - Added locking around the insertion and removal of entries to _subscription. As we need to retrieve the actual Subscription from the map when removing rather than the dummy object created for lookup. This requires two call to _subcription which is there for not thread safe. log4j - updated to have handy debug defaults that simply need uncommented. Test to follow, once cleaned up. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@506979 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/broker/etc/log4j.xml | 19 ++++-- .../java/org/apache/qpid/server/AMQChannel.java | 26 +++++--- .../org/apache/qpid/server/queue/AMQMessage.java | 5 ++ .../org/apache/qpid/server/queue/AMQQueue.java | 35 ++++++---- .../queue/ConcurrentSelectorDeliveryManager.java | 77 +++++++++++++++++++--- .../apache/qpid/server/queue/SubscriptionImpl.java | 69 ++++++++++++------- .../apache/qpid/server/queue/SubscriptionSet.java | 32 +++++++-- 7 files changed, 200 insertions(+), 63 deletions(-) diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index 810b65798e..7230a55dbd 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -47,14 +47,25 @@ - - + + + + + - + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index e1b6497062..4b029f88c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -310,13 +310,15 @@ public class AMQChannel { if (_log.isTraceEnabled()) { - _log.trace("Unsubscribed consumer:" + consumerTag); + _log.trace("Unsubscribed consumer:" + consumerTag + "on Session " + session + + " Unacked Map Size:" + _unacknowledgedMessageMap.size()); } AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) { q.unregisterProtocolSession(session, _channelId, consumerTag); } + requeue(); } /** @@ -358,15 +360,18 @@ public class AMQChannel */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - if (_log.isTraceEnabled()) - { - _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId + - " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag + - " from queue:" + queue.getName()); - } - synchronized (_unacknowledgedMessageMapLock) { + if (_log.isTraceEnabled()) + { + _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId + + "(" + System.identityHashCode(this) + ")" + + " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag + + " from queue:" + queue.getName() + + " unackedSize[" + System.identityHashCode(_unacknowledgedMessageMap) + "](pre-put):" + + _unacknowledgedMessageMap.size() + ":" + _unacknowledgedMessageMap.toString()); + } + _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); _lastDeliveryTag = deliveryTag; checkSuspension(); @@ -405,6 +410,11 @@ public class AMQChannel unacked.queue.deliver(unacked.message); } } + + if (_unacknowledgedMessageMap.size() != 0) + { + _log.error("unack map is not empty after resend was item added to unack map whilst consumer is closing"); + } } /** Called to resend all outstanding unacknowledged messages to this same channel. */ diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 8ba3cc5686..af7d7ea493 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -471,4 +471,9 @@ public class AMQMessage { return _takenBySubcription; } + + public boolean isTaken() + { + return _taken.get(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 99bf9ca31d..e6882906ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -341,7 +341,12 @@ public class AMQQueue implements Managable, Comparable public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { - debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} [{4}] and " + + "consumer tag {2} with {3}", + ps, channel, consumerTag, this, System.identityHashCode(channel))); + } Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); @@ -358,8 +363,13 @@ public class AMQQueue implements Managable, Comparable public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException { - debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, - this); + + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} [{4}] " + + "and consumer tag {2} from {3}", + ps, channel, consumerTag, this, System.identityHashCode(channel))); + } Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, @@ -371,6 +381,12 @@ public class AMQQueue implements Managable, Comparable " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Removed consumer tag {0} with channel {1} [{3}] from {2}", + consumerTag, channel, this, System.identityHashCode(channel))); + } + removedSubscription.close(); // if we are eligible for auto deletion, unregister from the queue registry @@ -412,7 +428,10 @@ public class AMQQueue implements Managable, Comparable protected void autodelete() throws AMQException { - debug("autodeleting {0}", this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("autodeleting {0}", this)); + } delete(); } @@ -516,14 +535,6 @@ public class AMQQueue implements Managable, Comparable return "Queue(" + _name + ")@" + System.identityHashCode(this); } - private void debug(String msg, Object... args) - { - if (_logger.isDebugEnabled()) - { - _logger.debug(MessageFormat.format(msg, args)); - } - } - public long getMinimumAlertRepeatGap() { return _minimumAlertRepeatGap; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 9b79657575..cf7d5bbc68 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -254,11 +254,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // message will be null if we have no messages in the messageQueue. if (message == null) { + if (_log.isTraceEnabled()) + { + _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + } return; } if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message (" + System.identityHashCode(message) + ") to :" + System.identityHashCode(this)); + _log.debug("Async Delivery Message (" + System.identityHashCode(message) + + ") by :" + System.identityHashCode(this) + + ") to :" + System.identityHashCode(sub)); } sub.send(message, queue); @@ -333,17 +339,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void deliver(String name, AMQMessage msg) throws FailedDequeueException { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(id() + "deliver :" + System.identityHashCode(msg)); + _log.debug(id() + " Deliver :" + System.identityHashCode(msg) + ")"); } //Check if we have someone to deliver the message to. _lock.lock(); try { + if (_log.isTraceEnabled()) + { + _log.trace(id() + " Getting next Subscriber for message :" + System.identityHashCode(msg) + ")"); + } + Subscription s = _subscriptions.nextSubscriber(msg); + if (_log.isTraceEnabled()) + { + _log.trace(id() + " Subscriber (" + System.identityHashCode(s) + ")" + + " selected for message :" + System.identityHashCode(msg) + ")"); + } + if (s == null) //no-one can take the message right now. { if (_log.isDebugEnabled()) @@ -392,18 +409,58 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else { + if (_log.isTraceEnabled()) + { + _log.trace(id() + " About to take sendLock for subscriber :" + System.identityHashCode(s) + + " to deliver message:" + System.identityHashCode(msg)); + } + //release lock now _lock.unlock(); - if (_log.isDebugEnabled()) + synchronized (s.sendlock()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + - System.identityHashCode(s) + ") :" + s); + if (!s.isSuspended()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + System.identityHashCode(s) + ") :" + s); + } + + //Mark message as taken + msg.taken(s); + //Deliver the message + s.send(msg, _queue); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send"); + } + } + } + + if (!msg.isTaken()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" + + " Subscriber:" + System.identityHashCode(s)); + } + + deliver(name, msg); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + + ") has been taken so disregarding deliver request to Subscriber:" + + System.identityHashCode(s)); + } } - //Mark message as taken - msg.taken(s); - //Deliver the message - s.send(msg, _queue); } } finally diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index a53e305e49..e6e3a9cadb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -69,7 +69,7 @@ public class SubscriptionImpl implements Subscription private boolean _closed = false; private AMQQueue _queue; - private final AtomicBoolean _resending = new AtomicBoolean(false); + private final AtomicBoolean _sendLock = new AtomicBoolean(false); public static class Factory implements SubscriptionFactory { @@ -193,7 +193,18 @@ public class SubscriptionImpl implements Subscription public String toString() { - return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; + String subscriber = "[channel=" + channel + + ", consumerTag=" + consumerTag + + ", session=" + protocolSession.getKey() + + ", resendQueue=" + (_resendQueue != null); + + if (_resendQueue != null) + { + subscriber += ", resendSize=" + _resendQueue.size(); + } + + + return subscriber + "]"; } /** @@ -239,7 +250,7 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); //fixme what is wrong with this? @@ -275,7 +286,7 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); //fixme what is wrong with this? @@ -292,7 +303,18 @@ public class SubscriptionImpl implements Subscription public boolean isSuspended() { - return channel.isSuspended() && !_resending.get(); + if (_logger.isTraceEnabled()) + { + if (channel.isSuspended()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + } + if (_sendLock.get()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + } + } + return channel.isSuspended() || _sendLock.get(); } /** @@ -386,7 +408,20 @@ public class SubscriptionImpl implements Subscription public void close() { - _logger.info("Closing subscription:" + this); + synchronized (_sendLock) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting SendLock true"); + } + + _sendLock.set(true); + + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + } if (_resendQueue != null && !_resendQueue.isEmpty()) { @@ -411,17 +446,17 @@ public class SubscriptionImpl implements Subscription )); _closed = true; } + } private void requeue() { - if (_queue != null) { - _logger.trace("Requeuing :" + _resendQueue.size() + " messages"); - - //Take control over to this thread for delivering messages from the Async Delivery. - setResending(true); + if (_logger.isTraceEnabled()) + { + _logger.trace("Requeuing :" + _resendQueue.size() + " messages"); + } while (!_resendQueue.isEmpty()) { @@ -441,8 +476,6 @@ public class SubscriptionImpl implements Subscription } } - setResending(false); - if (!_resendQueue.isEmpty()) { _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null."); @@ -462,14 +495,6 @@ public class SubscriptionImpl implements Subscription _resendQueue = null; } - private void setResending(boolean resending) - { - synchronized (_resending) - { - _resending.set(resending); - } - } - public boolean isBrowser() { return _isBrowser; @@ -528,7 +553,7 @@ public class SubscriptionImpl implements Subscription public Object sendlock() { - return _resending; + return _sendLock; } private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index c4dab50ff4..83570953c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -32,11 +32,12 @@ class SubscriptionSet implements WeightedSubscriptionManager { private static final Logger _log = Logger.getLogger(SubscriptionSet.class); - /** List of registered subscribers */ + /** List of registered subscribers all edits must be done whilst holidng _subscriptionsChange */ private List _subscriptions = new CopyOnWriteArrayList(); /** Used to control the round robin delivery of content */ private int _currentSubscriber; + private final Object _subscriptionsChange = new Object(); /** Accessor for unit tests. */ int getCurrentSubscriber() @@ -46,7 +47,10 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - _subscriptions.add(subscription); + synchronized (_subscriptionsChange) + { + _subscriptions.add(subscription); + } } /** @@ -59,13 +63,27 @@ class SubscriptionSet implements WeightedSubscriptionManager public Subscription removeSubscriber(Subscription subscription) { // TODO: possibly need O(1) operation here. - int subIndex = _subscriptions.indexOf(subscription); - if (subIndex != -1) + Subscription sub = null; + synchronized (_subscriptionsChange) + { + int subIndex = _subscriptions.indexOf(subscription); + + if (subIndex != -1) + { + //we can't just return the passed in subscription as it is a new object + // and doesn't contain the stored state we need. + //NOTE while this may be removed now anyone with an iterator will still have it in the list!! + sub = _subscriptions.remove(subIndex); + } + else + { + _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription); + } + } + if (sub != null) { - //we can't just return the passed in subscription as it is a new object - // and doesn't contain the stored state we need. - return _subscriptions.remove(subIndex); + return sub; } else { -- cgit v1.2.1