diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-13 11:46:37 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-13 11:46:37 +0000 |
commit | c257715e5f77e6e3bbddb4ccced0645ac0199698 (patch) | |
tree | 252d4165d626e4c890d73db22dd6122c4d09a5db | |
parent | 01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400 (diff) | |
download | qpid-python-c257715e5f77e6e3bbddb4ccced0645ac0199698.tar.gz |
QPID-346 Message loss after rollback/recover
With Multiple consumers closing and requeuing occasionally a message would be lost given the use of msg.getDeliveredToConsumer() as this will be set on the first send an so on the infrequent occasion that a subscriber closes whilst a message is being delivered then that message would be lost.
AMQChannel - Fixed bug where messages would not be requeued on consumer closure. Increased quantity of logging.
AMQMessage - Added method to get 'taken' status.
AMQQueue - Wrapped all log messages with correct is<X>Enabled() also removed debug() method as this makes debugging very difficult (log4j will always report the same log line, requiring searching of the file to fine the actual log line.)
ConcurrentSelectorDeliveryManager - Increased and enclosed logging (isXEnabled). Wrapped the send calls with a lock on the Subscription(Impl) such that a send will not occur if the Subscription(Impl) has been closed.
SubscriptionImpl - Used sendLock to set SI closed. This is used to mark subscription as suspended so no messages will be sent to it. Increased and wrapped logging.
SubscriptionSet - Added locking around the insertion and removal of entries to _subscription. As we need to retrieve the actual Subscription from the map when removing rather than the dummy object created for lookup. This requires two call to _subcription which is there for not thread safe.
log4j - updated to have handy debug defaults that simply need uncommented.
Test to follow, once cleaned up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@506979 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 200 insertions, 63 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index 810b65798e..7230a55dbd 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -47,14 +47,25 @@ </category> <category name="org.apache.qpid.framing.AMQDataBlockEncoder"> - <priority value="info"/> - </category> + <priority value="info"/> + </category> + <!--category name="org.apache.qpid.server.queue.SubscriptionImpl"> + <priority value="trace"/> + </category> - <category name="org.apache.qpid"> + <category name="org.apache.qpid.server.queue.ConcurrentSelectorDeliveryManager"> + <priority value="trace"/> + </category> + + <category name="org.apache.qpid.server.AMQChannel"> + <priority value="trace"/> + </category --> + + <category name="org.apache.qpid"> <priority value="warn"/> </category> - + <root> <priority value="info"/> <appender-ref ref="STDOUT"/> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index e1b6497062..4b029f88c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -310,13 +310,15 @@ public class AMQChannel { if (_log.isTraceEnabled()) { - _log.trace("Unsubscribed consumer:" + consumerTag); + _log.trace("Unsubscribed consumer:" + consumerTag + "on Session " + session + + " Unacked Map Size:" + _unacknowledgedMessageMap.size()); } AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) { q.unregisterProtocolSession(session, _channelId, consumerTag); } + requeue(); } /** @@ -358,15 +360,18 @@ public class AMQChannel */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - if (_log.isTraceEnabled()) - { - _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId + - " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag + - " from queue:" + queue.getName()); - } - synchronized (_unacknowledgedMessageMapLock) { + if (_log.isTraceEnabled()) + { + _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId + + "(" + System.identityHashCode(this) + ")" + + " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag + + " from queue:" + queue.getName() + + " unackedSize[" + System.identityHashCode(_unacknowledgedMessageMap) + "](pre-put):" + + _unacknowledgedMessageMap.size() + ":" + _unacknowledgedMessageMap.toString()); + } + _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); _lastDeliveryTag = deliveryTag; checkSuspension(); @@ -405,6 +410,11 @@ public class AMQChannel unacked.queue.deliver(unacked.message); } } + + if (_unacknowledgedMessageMap.size() != 0) + { + _log.error("unack map is not empty after resend was item added to unack map whilst consumer is closing"); + } } /** Called to resend all outstanding unacknowledged messages to this same channel. */ diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 8ba3cc5686..af7d7ea493 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -471,4 +471,9 @@ public class AMQMessage { return _takenBySubcription; } + + public boolean isTaken() + { + return _taken.get(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 99bf9ca31d..e6882906ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -341,7 +341,12 @@ public class AMQQueue implements Managable, Comparable public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { - debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} [{4}] and " + + "consumer tag {2} with {3}", + ps, channel, consumerTag, this, System.identityHashCode(channel))); + } Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); @@ -358,8 +363,13 @@ public class AMQQueue implements Managable, Comparable public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException { - debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, - this); + + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} [{4}] " + + "and consumer tag {2} from {3}", + ps, channel, consumerTag, this, System.identityHashCode(channel))); + } Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, @@ -371,6 +381,12 @@ public class AMQQueue implements Managable, Comparable " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Removed consumer tag {0} with channel {1} [{3}] from {2}", + consumerTag, channel, this, System.identityHashCode(channel))); + } + removedSubscription.close(); // if we are eligible for auto deletion, unregister from the queue registry @@ -412,7 +428,10 @@ public class AMQQueue implements Managable, Comparable protected void autodelete() throws AMQException { - debug("autodeleting {0}", this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("autodeleting {0}", this)); + } delete(); } @@ -516,14 +535,6 @@ public class AMQQueue implements Managable, Comparable return "Queue(" + _name + ")@" + System.identityHashCode(this); } - private void debug(String msg, Object... args) - { - if (_logger.isDebugEnabled()) - { - _logger.debug(MessageFormat.format(msg, args)); - } - } - public long getMinimumAlertRepeatGap() { return _minimumAlertRepeatGap; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 9b79657575..cf7d5bbc68 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -254,11 +254,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // message will be null if we have no messages in the messageQueue. if (message == null) { + if (_log.isTraceEnabled()) + { + _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + } return; } if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message (" + System.identityHashCode(message) + ") to :" + System.identityHashCode(this)); + _log.debug("Async Delivery Message (" + System.identityHashCode(message) + + ") by :" + System.identityHashCode(this) + + ") to :" + System.identityHashCode(sub)); } sub.send(message, queue); @@ -333,17 +339,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void deliver(String name, AMQMessage msg) throws FailedDequeueException { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(id() + "deliver :" + System.identityHashCode(msg)); + _log.debug(id() + " Deliver :" + System.identityHashCode(msg) + ")"); } //Check if we have someone to deliver the message to. _lock.lock(); try { + if (_log.isTraceEnabled()) + { + _log.trace(id() + " Getting next Subscriber for message :" + System.identityHashCode(msg) + ")"); + } + Subscription s = _subscriptions.nextSubscriber(msg); + if (_log.isTraceEnabled()) + { + _log.trace(id() + " Subscriber (" + System.identityHashCode(s) + ")" + + " selected for message :" + System.identityHashCode(msg) + ")"); + } + if (s == null) //no-one can take the message right now. { if (_log.isDebugEnabled()) @@ -392,18 +409,58 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else { + if (_log.isTraceEnabled()) + { + _log.trace(id() + " About to take sendLock for subscriber :" + System.identityHashCode(s) + + " to deliver message:" + System.identityHashCode(msg)); + } + //release lock now _lock.unlock(); - if (_log.isDebugEnabled()) + synchronized (s.sendlock()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + - System.identityHashCode(s) + ") :" + s); + if (!s.isSuspended()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + System.identityHashCode(s) + ") :" + s); + } + + //Mark message as taken + msg.taken(s); + //Deliver the message + s.send(msg, _queue); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send"); + } + } + } + + if (!msg.isTaken()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" + + " Subscriber:" + System.identityHashCode(s)); + } + + deliver(name, msg); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + + ") has been taken so disregarding deliver request to Subscriber:" + + System.identityHashCode(s)); + } } - //Mark message as taken - msg.taken(s); - //Deliver the message - s.send(msg, _queue); } } finally diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index a53e305e49..e6e3a9cadb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -69,7 +69,7 @@ public class SubscriptionImpl implements Subscription private boolean _closed = false; private AMQQueue _queue; - private final AtomicBoolean _resending = new AtomicBoolean(false); + private final AtomicBoolean _sendLock = new AtomicBoolean(false); public static class Factory implements SubscriptionFactory { @@ -193,7 +193,18 @@ public class SubscriptionImpl implements Subscription public String toString() { - return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; + String subscriber = "[channel=" + channel + + ", consumerTag=" + consumerTag + + ", session=" + protocolSession.getKey() + + ", resendQueue=" + (_resendQueue != null); + + if (_resendQueue != null) + { + subscriber += ", resendSize=" + _resendQueue.size(); + } + + + return subscriber + "]"; } /** @@ -239,7 +250,7 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); //fixme what is wrong with this? @@ -275,7 +286,7 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); //fixme what is wrong with this? @@ -292,7 +303,18 @@ public class SubscriptionImpl implements Subscription public boolean isSuspended() { - return channel.isSuspended() && !_resending.get(); + if (_logger.isTraceEnabled()) + { + if (channel.isSuspended()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + } + if (_sendLock.get()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + } + } + return channel.isSuspended() || _sendLock.get(); } /** @@ -386,7 +408,20 @@ public class SubscriptionImpl implements Subscription public void close() { - _logger.info("Closing subscription:" + this); + synchronized (_sendLock) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting SendLock true"); + } + + _sendLock.set(true); + + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + } if (_resendQueue != null && !_resendQueue.isEmpty()) { @@ -411,17 +446,17 @@ public class SubscriptionImpl implements Subscription )); _closed = true; } + } private void requeue() { - if (_queue != null) { - _logger.trace("Requeuing :" + _resendQueue.size() + " messages"); - - //Take control over to this thread for delivering messages from the Async Delivery. - setResending(true); + if (_logger.isTraceEnabled()) + { + _logger.trace("Requeuing :" + _resendQueue.size() + " messages"); + } while (!_resendQueue.isEmpty()) { @@ -441,8 +476,6 @@ public class SubscriptionImpl implements Subscription } } - setResending(false); - if (!_resendQueue.isEmpty()) { _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null."); @@ -462,14 +495,6 @@ public class SubscriptionImpl implements Subscription _resendQueue = null; } - private void setResending(boolean resending) - { - synchronized (_resending) - { - _resending.set(resending); - } - } - public boolean isBrowser() { return _isBrowser; @@ -528,7 +553,7 @@ public class SubscriptionImpl implements Subscription public Object sendlock() { - return _resending; + return _sendLock; } private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index c4dab50ff4..83570953c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -32,11 +32,12 @@ class SubscriptionSet implements WeightedSubscriptionManager { private static final Logger _log = Logger.getLogger(SubscriptionSet.class); - /** List of registered subscribers */ + /** List of registered subscribers all edits must be done whilst holidng _subscriptionsChange */ private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>(); /** Used to control the round robin delivery of content */ private int _currentSubscriber; + private final Object _subscriptionsChange = new Object(); /** Accessor for unit tests. */ int getCurrentSubscriber() @@ -46,7 +47,10 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - _subscriptions.add(subscription); + synchronized (_subscriptionsChange) + { + _subscriptions.add(subscription); + } } /** @@ -59,13 +63,27 @@ class SubscriptionSet implements WeightedSubscriptionManager public Subscription removeSubscriber(Subscription subscription) { // TODO: possibly need O(1) operation here. - int subIndex = _subscriptions.indexOf(subscription); - if (subIndex != -1) + Subscription sub = null; + synchronized (_subscriptionsChange) + { + int subIndex = _subscriptions.indexOf(subscription); + + if (subIndex != -1) + { + //we can't just return the passed in subscription as it is a new object + // and doesn't contain the stored state we need. + //NOTE while this may be removed now anyone with an iterator will still have it in the list!! + sub = _subscriptions.remove(subIndex); + } + else + { + _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription); + } + } + if (sub != null) { - //we can't just return the passed in subscription as it is a new object - // and doesn't contain the stored state we need. - return _subscriptions.remove(subIndex); + return sub; } else { |